summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv
diff options
context:
space:
mode:
authorSimon Rettberg2015-08-28 18:04:16 +0200
committerSimon Rettberg2015-08-28 18:04:16 +0200
commit10f0687fe551bda88120c2dc2b003035dd9bbea8 (patch)
treea9a3103c5ca1981bad169a6a1527f0252c4bc76f /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv
parent[client] save the selected download folder and not the generated folder (diff)
downloadtutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.gz
tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.xz
tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.zip
[server] Working on image download from master server
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java34
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java (renamed from dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java)104
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java (renamed from dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java)16
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java132
4 files changed, 260 insertions, 26 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
index 6e8bcf1c..f821813f 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
@@ -41,24 +41,26 @@ public class FileServer implements IncomingEvent {
private final Listener sslListener;
- private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(2, Constants.MAX_UPLOADS
+ private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, Constants.MAX_UPLOADS
+ Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));
/**
* All currently running uploads, indexed by token
*/
- private final Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>();
+ private final Map<String, IncomingDataTransfer> uploads = new ConcurrentHashMap<>();
/**
* All currently running downloads, indexed by token
*/
- private final Map<String, ActiveDownload> downloads = new ConcurrentHashMap<>();
+ private final Map<String, OutgoingDataTransfer> downloads = new ConcurrentHashMap<>();
private static final FileServer globalInstance = new FileServer();
private FileServer() {
SSLContext ctx = Identity.getSSLContext();
sslListener = ctx == null ? null : new Listener(this, ctx, 9093, Constants.TRANSFER_TIMEOUT);
+ LOGGER.info("Max allowed concurrent uploads from clients: " + Constants.MAX_UPLOADS);
+ LOGGER.info("Max allowed concurrent downloads from clients: " + Constants.MAX_DOWNLOADS);
}
public static FileServer instance() {
@@ -77,7 +79,7 @@ public class FileServer implements IncomingEvent {
public void incomingDownloadRequest(Uploader uploader) throws IOException {
String token = uploader.getToken();
LOGGER.info("Incoming filetransfer with token " + token);
- ActiveDownload download = downloads.get(token);
+ OutgoingDataTransfer download = downloads.get(token);
if (download == null) {
LOGGER.warn("Unknown token " + token);
uploader.cancel();
@@ -92,7 +94,7 @@ public class FileServer implements IncomingEvent {
public void incomingUploadRequest(Downloader downloader) throws IOException {
String token = downloader.getToken();
LOGGER.info("Incoming filetransfer with token " + token);
- ActiveUpload upload = uploads.get(token);
+ IncomingDataTransfer upload = uploads.get(token);
if (upload == null) {
LOGGER.warn("Unknown token " + token);
downloader.cancel();
@@ -109,25 +111,25 @@ public class FileServer implements IncomingEvent {
* @param uploadToken
* @return
*/
- public ActiveUpload getUploadByToken(String uploadToken) {
+ public IncomingDataTransfer getUploadByToken(String uploadToken) {
if (uploadToken == null)
return null;
return uploads.get(uploadToken);
}
- public ActiveDownload getDownloadByToken(String downloadToken) {
+ public OutgoingDataTransfer getDownloadByToken(String downloadToken) {
if (downloadToken == null)
return null;
return downloads.get(downloadToken);
}
- public ActiveUpload createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize,
+ public IncomingDataTransfer createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize,
List<byte[]> sha1Sums, byte[] machineDescription) throws TTransferRejectedException {
- Iterator<ActiveUpload> it = uploads.values().iterator();
+ Iterator<IncomingDataTransfer> it = uploads.values().iterator();
final long now = System.currentTimeMillis();
int activeUploads = 0;
while (it.hasNext()) {
- ActiveUpload upload = it.next();
+ IncomingDataTransfer upload = it.next();
if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) {
upload.cancel();
it.remove();
@@ -146,9 +148,9 @@ public class FileServer implements IncomingEvent {
destinationFile.getParentFile().mkdirs();
String key = UUID.randomUUID().toString();
- ActiveUpload upload;
+ IncomingDataTransfer upload;
try {
- upload = new ActiveUpload(key, owner, image, destinationFile, fileSize, sha1Sums,
+ upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums,
machineDescription);
} catch (FileNotFoundException e) {
LOGGER.error("Could not open destination file for writing", e);
@@ -171,13 +173,13 @@ public class FileServer implements IncomingEvent {
return sslListener.getPort();
}
- public ActiveDownload createNewUserDownload(LocalImageVersion localImageData)
+ public OutgoingDataTransfer createNewUserDownload(LocalImageVersion localImageData)
throws TTransferRejectedException {
- Iterator<ActiveDownload> it = downloads.values().iterator();
+ Iterator<OutgoingDataTransfer> it = downloads.values().iterator();
final long now = System.currentTimeMillis();
int activeDownloads = 0;
while (it.hasNext()) {
- ActiveDownload download = it.next();
+ OutgoingDataTransfer download = it.next();
if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) {
download.cancel();
it.remove();
@@ -215,7 +217,7 @@ public class FileServer implements IncomingEvent {
throw new TTransferRejectedException(errorMessage);
}
String key = UUID.randomUUID().toString();
- ActiveDownload transfer = new ActiveDownload(key, srcFile);
+ OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile);
downloads.put(key, transfer);
return transfer;
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
index 16f85fc1..0ca5d9b6 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
@@ -4,6 +4,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -11,15 +12,20 @@ import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLContext;
+
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
+import org.openslx.bwlp.sat.thrift.ThriftUtil;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.Util;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.bwlp.thrift.iface.UserInfo;
@@ -33,9 +39,9 @@ import org.openslx.filetransfer.util.HashChecker;
import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
import org.openslx.filetransfer.util.HashChecker.HashResult;
-public class ActiveUpload extends AbstractTransfer implements HashCheckCallback {
+public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback {
- private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);
+ private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class);
/**
* How many concurrent connections per upload
@@ -45,10 +51,10 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
/**
* Self reference for inner classes.
*/
- private final ActiveUpload activeUpload = this;
+ private final IncomingDataTransfer activeUpload = this;
/**
- * This is an active upload, so on our end, we have a Downloader.
+ * Remote peer is uploading, so on our end, we have Downloaders
*/
private List<Downloader> downloads = new ArrayList<>();
@@ -96,6 +102,11 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
*/
private boolean fileWritable = true;
+ /**
+ * Set if this is a download from the master server
+ */
+ private final TransferInformation masterTransferInfo;
+
private static final HashChecker hashChecker;
static {
@@ -108,8 +119,9 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
hashChecker = hc;
}
- public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile,
- long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws FileNotFoundException {
+ public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image,
+ File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription)
+ throws FileNotFoundException {
super(uploadId);
this.tmpFileName = destinationFile;
this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw");
@@ -118,6 +130,76 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
this.image = image;
this.fileSize = fileSize;
this.machineDescription = machineDescription;
+ this.masterTransferInfo = null;
+ }
+
+ public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo)
+ throws FileNotFoundException {
+ super(publishData.imageVersionId);
+ ImageDetailsRead idr = new ImageDetailsRead();
+ idr.setCreateTime(publishData.createTime);
+ idr.setDescription(publishData.description);
+ idr.setImageBaseId(publishData.imageBaseId);
+ idr.setImageName(publishData.imageName);
+ idr.setIsTemplate(publishData.isTemplate);
+ idr.setLatestVersionId(publishData.imageVersionId);
+ idr.setOsId(publishData.osId);
+ idr.setOwnerId(publishData.user.userId);
+ idr.setTags(publishData.tags);
+ idr.setUpdaterId(publishData.user.userId);
+ idr.setUpdateTime(publishData.createTime);
+ idr.setVirtId(publishData.virtId);
+ this.tmpFileName = tmpFile;
+ this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw");
+ this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null
+ : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
+ this.owner = publishData.user;
+ this.image = idr;
+ this.fileSize = publishData.fileSize;
+ this.machineDescription = transferInfo.machineDescription.getBytes(StandardCharsets.UTF_8);
+ this.masterTransferInfo = transferInfo;
+ this.versionSettings = new ImageVersionWrite(false);
+ }
+
+ /**
+ * Called periodically if this is a transfer from the master server, so we
+ * can make sure the transfer is running.
+ */
+ public void heartBeat(ThreadPoolExecutor pool) {
+ if (masterTransferInfo == null)
+ return;
+ synchronized (this) {
+ synchronized (downloads) {
+ if (downloads.size() >= 1) // TODO What to pick here?
+ return;
+ }
+ Downloader downloader = null;
+ if (masterTransferInfo.plainPort != 0) {
+ try {
+ downloader = new Downloader(Configuration.getMasterServerAddress(),
+ masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null,
+ masterTransferInfo.token);
+ } catch (Exception e1) {
+ LOGGER.debug("Plain connect failed", e1);
+ downloader = null;
+ }
+ }
+ if (downloader == null && masterTransferInfo.sslPort != 0) {
+ try {
+ downloader = new Downloader(Configuration.getMasterServerAddress(),
+ masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready
+ masterTransferInfo.token);
+ } catch (Exception e2) {
+ LOGGER.debug("SSL connect failed", e2);
+ downloader = null;
+ }
+ }
+ if (downloader == null) {
+ LOGGER.warn("Could not connect to master server for downloading " + image.imageName);
+ return;
+ }
+ addConnection(downloader, pool);
+ }
}
/**
@@ -442,4 +524,14 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
return downloads.size();
}
+ @Override
+ protected void finalize() {
+ try {
+ Util.safeClose(tmpFileHandle);
+ if (tmpFileName.exists())
+ tmpFileName.delete();
+ } catch (Throwable t) {
+ }
+ }
+
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
index 504317f3..e7c6715f 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
@@ -9,9 +9,9 @@ import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.filetransfer.Uploader;
-public class ActiveDownload extends AbstractTransfer {
+public class OutgoingDataTransfer extends AbstractTransfer {
- private static final Logger LOGGER = Logger.getLogger(ActiveDownload.class);
+ private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class);
/**
* How many concurrent connections per download
@@ -19,7 +19,7 @@ public class ActiveDownload extends AbstractTransfer {
private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_DOWNLOADS / 4, 1);
/**
- * This is a download, so we have uploaders
+ * Remote peer is downloading, so we have Uploaders
*/
private List<Uploader> uploads = new ArrayList<>();
@@ -27,12 +27,20 @@ public class ActiveDownload extends AbstractTransfer {
private boolean isCanceled = false;
- public ActiveDownload(String uuid, File file) {
+ public OutgoingDataTransfer(String uuid, File file) {
super(uuid);
this.sourceFile = file;
}
/**
+ * Called periodically if this is a transfer from the master server, so we
+ * can make sure the transfer is running.
+ */
+ public void heartBeat(ThreadPoolExecutor pool) {
+ // TODO
+ }
+
+ /**
* Add another connection for this file transfer. Currently only one
* connection is allowed, but this might change in the future.
*
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
new file mode 100644
index 00000000..82c4ab9e
--- /dev/null
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
@@ -0,0 +1,132 @@
+package org.openslx.bwlp.sat.fileserv;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.openslx.bwlp.sat.util.Constants;
+import org.openslx.bwlp.sat.util.Formatter;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
+import org.openslx.bwlp.thrift.iface.InvocationError;
+import org.openslx.bwlp.thrift.iface.TAuthorizationException;
+import org.openslx.bwlp.thrift.iface.TInvocationException;
+import org.openslx.bwlp.thrift.iface.TNotFoundException;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+import org.openslx.thrifthelper.ThriftManager;
+import org.openslx.util.QuickTimer;
+import org.openslx.util.QuickTimer.Task;
+
+/**
+ * Manages file transfers between this satellite and the master server.
+ */
+public class SyncTransferHandler {
+
+ private static final Logger LOGGER = Logger.getLogger(SyncTransferHandler.class);
+
+ private static final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1,
+ Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES,
+ new ArrayBlockingQueue<Runnable>(1));
+
+ /**
+ * All currently running uploads, indexed by token
+ */
+ private static final Map<String, IncomingDataTransfer> downloads = new ConcurrentHashMap<>();
+
+ /**
+ * All currently running downloads, indexed by token
+ */
+ private static final Map<String, OutgoingDataTransfer> uploads = new ConcurrentHashMap<>();
+
+ private static Task heartBeatTask = new Task() {
+ private final Runnable worker = new Runnable() {
+ @Override
+ public void run() {
+ for (IncomingDataTransfer download : downloads.values()) {
+ if (download.isActive())
+ download.heartBeat(transferPool);
+ }
+ for (OutgoingDataTransfer upload : uploads.values()) {
+ if (upload.isActive())
+ upload.heartBeat(transferPool);
+ }
+ }
+ };
+
+ @Override
+ public void fire() {
+ if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1)
+ return;
+ transferPool.execute(worker);
+ }
+ };
+
+ //
+
+ static {
+ QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56));
+ }
+
+ public synchronized static String requestImageDownload(ImagePublishData image)
+ throws TInvocationException, TAuthorizationException, TNotFoundException {
+ TransferInformation transferInfo;
+ // Already replicating this one?
+ if (downloads.containsKey(image.imageVersionId))
+ return image.imageVersionId;
+ checkDownloadCount();
+ try {
+ transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId);
+ } catch (TAuthorizationException e) {
+ LOGGER.warn("Master server rejected our session on downloadImage", e);
+ throw e;
+ } catch (TNotFoundException e) {
+ LOGGER.warn("Master server couldn't find image on downloadImage", e);
+ throw e;
+ } catch (TException e) {
+ LOGGER.warn("Master server made a boo-boo on downloadImage", e);
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Communication with master server failed");
+ }
+ File tmpFile = null;
+ do {
+ tmpFile = Formatter.getTempImageName();
+ } while (tmpFile.exists());
+ tmpFile.getParentFile().mkdirs();
+ try {
+ IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo);
+ downloads.put(transfer.getId(), transfer);
+ return transfer.getId();
+ } catch (FileNotFoundException e) {
+ LOGGER.warn("Could not open " + tmpFile.getAbsolutePath());
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Could not access local file for writing");
+ }
+ }
+
+ private static void checkDownloadCount() throws TInvocationException {
+ Iterator<IncomingDataTransfer> it = downloads.values().iterator();
+ final long now = System.currentTimeMillis();
+ int activeDownloads = 0;
+ while (it.hasNext()) {
+ IncomingDataTransfer upload = it.next();
+ if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) {
+ upload.cancel();
+ it.remove();
+ continue;
+ }
+ activeDownloads++;
+ }
+ if (activeDownloads > Constants.MAX_MASTER_DOWNLOADS) {
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Server busy. Too many running downloads (" + activeDownloads + "/"
+ + Constants.MAX_MASTER_DOWNLOADS + ").");
+ }
+ }
+
+}