summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org
diff options
context:
space:
mode:
authorSimon Rettberg2015-08-28 18:04:16 +0200
committerSimon Rettberg2015-08-28 18:04:16 +0200
commit10f0687fe551bda88120c2dc2b003035dd9bbea8 (patch)
treea9a3103c5ca1981bad169a6a1527f0252c4bc76f /dozentenmodulserver/src/main/java/org
parent[client] save the selected download folder and not the generated folder (diff)
downloadtutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.gz
tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.xz
tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.zip
[server] Working on image download from master server
Diffstat (limited to 'dozentenmodulserver/src/main/java/org')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java4
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java44
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java34
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java (renamed from dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java)104
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java (renamed from dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java)16
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java132
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java30
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java70
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java30
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java6
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java32
11 files changed, 441 insertions, 61 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java
index 62000490..86d231ef 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java
@@ -82,8 +82,8 @@ public class App {
}
});
- ThriftManager.setMasterServerAddress(SSLContext.getDefault(),
- "bwlp-masterserver.ruf.uni-freiburg.de", 9091, 30000);
+ ThriftManager.setMasterServerAddress(SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready
+ Configuration.getMasterServerAddress(), 9091, 30000);
// Load useful things from master server
OrganizationList.get();
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
index 2e2393f8..c03d8322 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
@@ -21,6 +21,7 @@ import org.openslx.bwlp.sat.util.Util;
import org.openslx.bwlp.thrift.iface.ImageBaseWrite;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePermissions;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageSummaryRead;
import org.openslx.bwlp.thrift.iface.ImageVersionDetails;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
@@ -283,6 +284,49 @@ public class DbImage {
}
}
+ /**
+ * Create or update a base image with the given publish data.
+ * Used for replication from master server.
+ *
+ * @param user The user who triggered the download, and will be considered
+ * the creator; if null, the creator of the image will be used
+ * @param image The image to create
+ * @throws SQLException
+ */
+ public static void writeBaseImage(UserInfo user, ImagePublishData image) throws SQLException {
+ if (user == null) {
+ user = image.user;
+ }
+ try (MysqlConnection connection = Database.getConnection()) {
+ MysqlStatement stmt = connection.prepareStatement("INSERT INTO imagebase"
+ + " (imagebaseid, displayname, description, osid, virtid, createtime,"
+ + " updatetime, ownerid, updaterid, sharemode, istemplate,"
+ + " canlinkdefault, candownloaddefault, caneditdefault, canadmindefault)"
+ + " VALUES "
+ + " (:imagebaseid, :displayname, :description, :osid, :virtid, :unixtime,"
+ + " :unixtime, :userid, :userid, :sharemode, :istemplate,"
+ + " 1, 1, 0, 0) "
+ + " ON DUPLICATE KEY UPDATE "
+ + " displayname = VALUES(displayname), description = VALUES(description),"
+ + " osid = VALUES(osid), virtid = VALUES(virtid), updatetime = VALUES(updatetime),"
+ + " updaterid = VALUES(updaterid), istemplate = VALUES(istemplate)");
+ stmt.setString("imagebaseid", image.imageBaseId);
+ stmt.setString("displayname", image.imageName);
+ stmt.setString("description", image.description);
+ stmt.setInt("osid", image.osId);
+ stmt.setString("virtid", image.virtId);
+ stmt.setLong("unixtime", Util.unixTime());
+ stmt.setString("userid", user.userId);
+ stmt.setString("sharemode", "DOWNLOAD");
+ stmt.setBoolean("istemplate", image.isTemplate);
+ stmt.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ LOGGER.error("Query failed in DbImage.writeBaseImage()", e);
+ throw e;
+ }
+ }
+
public static void updateImageMetadata(UserInfo user, String imageBaseId, ImageBaseWrite image)
throws SQLException {
if (image.imageName.length() > 100) {
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
index 6e8bcf1c..f821813f 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
@@ -41,24 +41,26 @@ public class FileServer implements IncomingEvent {
private final Listener sslListener;
- private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(2, Constants.MAX_UPLOADS
+ private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, Constants.MAX_UPLOADS
+ Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));
/**
* All currently running uploads, indexed by token
*/
- private final Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>();
+ private final Map<String, IncomingDataTransfer> uploads = new ConcurrentHashMap<>();
/**
* All currently running downloads, indexed by token
*/
- private final Map<String, ActiveDownload> downloads = new ConcurrentHashMap<>();
+ private final Map<String, OutgoingDataTransfer> downloads = new ConcurrentHashMap<>();
private static final FileServer globalInstance = new FileServer();
private FileServer() {
SSLContext ctx = Identity.getSSLContext();
sslListener = ctx == null ? null : new Listener(this, ctx, 9093, Constants.TRANSFER_TIMEOUT);
+ LOGGER.info("Max allowed concurrent uploads from clients: " + Constants.MAX_UPLOADS);
+ LOGGER.info("Max allowed concurrent downloads from clients: " + Constants.MAX_DOWNLOADS);
}
public static FileServer instance() {
@@ -77,7 +79,7 @@ public class FileServer implements IncomingEvent {
public void incomingDownloadRequest(Uploader uploader) throws IOException {
String token = uploader.getToken();
LOGGER.info("Incoming filetransfer with token " + token);
- ActiveDownload download = downloads.get(token);
+ OutgoingDataTransfer download = downloads.get(token);
if (download == null) {
LOGGER.warn("Unknown token " + token);
uploader.cancel();
@@ -92,7 +94,7 @@ public class FileServer implements IncomingEvent {
public void incomingUploadRequest(Downloader downloader) throws IOException {
String token = downloader.getToken();
LOGGER.info("Incoming filetransfer with token " + token);
- ActiveUpload upload = uploads.get(token);
+ IncomingDataTransfer upload = uploads.get(token);
if (upload == null) {
LOGGER.warn("Unknown token " + token);
downloader.cancel();
@@ -109,25 +111,25 @@ public class FileServer implements IncomingEvent {
* @param uploadToken
* @return
*/
- public ActiveUpload getUploadByToken(String uploadToken) {
+ public IncomingDataTransfer getUploadByToken(String uploadToken) {
if (uploadToken == null)
return null;
return uploads.get(uploadToken);
}
- public ActiveDownload getDownloadByToken(String downloadToken) {
+ public OutgoingDataTransfer getDownloadByToken(String downloadToken) {
if (downloadToken == null)
return null;
return downloads.get(downloadToken);
}
- public ActiveUpload createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize,
+ public IncomingDataTransfer createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize,
List<byte[]> sha1Sums, byte[] machineDescription) throws TTransferRejectedException {
- Iterator<ActiveUpload> it = uploads.values().iterator();
+ Iterator<IncomingDataTransfer> it = uploads.values().iterator();
final long now = System.currentTimeMillis();
int activeUploads = 0;
while (it.hasNext()) {
- ActiveUpload upload = it.next();
+ IncomingDataTransfer upload = it.next();
if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) {
upload.cancel();
it.remove();
@@ -146,9 +148,9 @@ public class FileServer implements IncomingEvent {
destinationFile.getParentFile().mkdirs();
String key = UUID.randomUUID().toString();
- ActiveUpload upload;
+ IncomingDataTransfer upload;
try {
- upload = new ActiveUpload(key, owner, image, destinationFile, fileSize, sha1Sums,
+ upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums,
machineDescription);
} catch (FileNotFoundException e) {
LOGGER.error("Could not open destination file for writing", e);
@@ -171,13 +173,13 @@ public class FileServer implements IncomingEvent {
return sslListener.getPort();
}
- public ActiveDownload createNewUserDownload(LocalImageVersion localImageData)
+ public OutgoingDataTransfer createNewUserDownload(LocalImageVersion localImageData)
throws TTransferRejectedException {
- Iterator<ActiveDownload> it = downloads.values().iterator();
+ Iterator<OutgoingDataTransfer> it = downloads.values().iterator();
final long now = System.currentTimeMillis();
int activeDownloads = 0;
while (it.hasNext()) {
- ActiveDownload download = it.next();
+ OutgoingDataTransfer download = it.next();
if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) {
download.cancel();
it.remove();
@@ -215,7 +217,7 @@ public class FileServer implements IncomingEvent {
throw new TTransferRejectedException(errorMessage);
}
String key = UUID.randomUUID().toString();
- ActiveDownload transfer = new ActiveDownload(key, srcFile);
+ OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile);
downloads.put(key, transfer);
return transfer;
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
index 16f85fc1..0ca5d9b6 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
@@ -4,6 +4,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -11,15 +12,20 @@ import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLContext;
+
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
+import org.openslx.bwlp.sat.thrift.ThriftUtil;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.Util;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.bwlp.thrift.iface.UserInfo;
@@ -33,9 +39,9 @@ import org.openslx.filetransfer.util.HashChecker;
import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
import org.openslx.filetransfer.util.HashChecker.HashResult;
-public class ActiveUpload extends AbstractTransfer implements HashCheckCallback {
+public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback {
- private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);
+ private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class);
/**
* How many concurrent connections per upload
@@ -45,10 +51,10 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
/**
* Self reference for inner classes.
*/
- private final ActiveUpload activeUpload = this;
+ private final IncomingDataTransfer activeUpload = this;
/**
- * This is an active upload, so on our end, we have a Downloader.
+ * Remote peer is uploading, so on our end, we have Downloaders
*/
private List<Downloader> downloads = new ArrayList<>();
@@ -96,6 +102,11 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
*/
private boolean fileWritable = true;
+ /**
+ * Set if this is a download from the master server
+ */
+ private final TransferInformation masterTransferInfo;
+
private static final HashChecker hashChecker;
static {
@@ -108,8 +119,9 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
hashChecker = hc;
}
- public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile,
- long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws FileNotFoundException {
+ public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image,
+ File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription)
+ throws FileNotFoundException {
super(uploadId);
this.tmpFileName = destinationFile;
this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw");
@@ -118,6 +130,76 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
this.image = image;
this.fileSize = fileSize;
this.machineDescription = machineDescription;
+ this.masterTransferInfo = null;
+ }
+
+ public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo)
+ throws FileNotFoundException {
+ super(publishData.imageVersionId);
+ ImageDetailsRead idr = new ImageDetailsRead();
+ idr.setCreateTime(publishData.createTime);
+ idr.setDescription(publishData.description);
+ idr.setImageBaseId(publishData.imageBaseId);
+ idr.setImageName(publishData.imageName);
+ idr.setIsTemplate(publishData.isTemplate);
+ idr.setLatestVersionId(publishData.imageVersionId);
+ idr.setOsId(publishData.osId);
+ idr.setOwnerId(publishData.user.userId);
+ idr.setTags(publishData.tags);
+ idr.setUpdaterId(publishData.user.userId);
+ idr.setUpdateTime(publishData.createTime);
+ idr.setVirtId(publishData.virtId);
+ this.tmpFileName = tmpFile;
+ this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw");
+ this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null
+ : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
+ this.owner = publishData.user;
+ this.image = idr;
+ this.fileSize = publishData.fileSize;
+ this.machineDescription = transferInfo.machineDescription.getBytes(StandardCharsets.UTF_8);
+ this.masterTransferInfo = transferInfo;
+ this.versionSettings = new ImageVersionWrite(false);
+ }
+
+ /**
+ * Called periodically if this is a transfer from the master server, so we
+ * can make sure the transfer is running.
+ */
+ public void heartBeat(ThreadPoolExecutor pool) {
+ if (masterTransferInfo == null)
+ return;
+ synchronized (this) {
+ synchronized (downloads) {
+ if (downloads.size() >= 1) // TODO What to pick here?
+ return;
+ }
+ Downloader downloader = null;
+ if (masterTransferInfo.plainPort != 0) {
+ try {
+ downloader = new Downloader(Configuration.getMasterServerAddress(),
+ masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null,
+ masterTransferInfo.token);
+ } catch (Exception e1) {
+ LOGGER.debug("Plain connect failed", e1);
+ downloader = null;
+ }
+ }
+ if (downloader == null && masterTransferInfo.sslPort != 0) {
+ try {
+ downloader = new Downloader(Configuration.getMasterServerAddress(),
+ masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready
+ masterTransferInfo.token);
+ } catch (Exception e2) {
+ LOGGER.debug("SSL connect failed", e2);
+ downloader = null;
+ }
+ }
+ if (downloader == null) {
+ LOGGER.warn("Could not connect to master server for downloading " + image.imageName);
+ return;
+ }
+ addConnection(downloader, pool);
+ }
}
/**
@@ -442,4 +524,14 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback
return downloads.size();
}
+ @Override
+ protected void finalize() {
+ try {
+ Util.safeClose(tmpFileHandle);
+ if (tmpFileName.exists())
+ tmpFileName.delete();
+ } catch (Throwable t) {
+ }
+ }
+
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
index 504317f3..e7c6715f 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
@@ -9,9 +9,9 @@ import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.filetransfer.Uploader;
-public class ActiveDownload extends AbstractTransfer {
+public class OutgoingDataTransfer extends AbstractTransfer {
- private static final Logger LOGGER = Logger.getLogger(ActiveDownload.class);
+ private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class);
/**
* How many concurrent connections per download
@@ -19,7 +19,7 @@ public class ActiveDownload extends AbstractTransfer {
private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_DOWNLOADS / 4, 1);
/**
- * This is a download, so we have uploaders
+ * Remote peer is downloading, so we have Uploaders
*/
private List<Uploader> uploads = new ArrayList<>();
@@ -27,12 +27,20 @@ public class ActiveDownload extends AbstractTransfer {
private boolean isCanceled = false;
- public ActiveDownload(String uuid, File file) {
+ public OutgoingDataTransfer(String uuid, File file) {
super(uuid);
this.sourceFile = file;
}
/**
+ * Called periodically if this is a transfer from the master server, so we
+ * can make sure the transfer is running.
+ */
+ public void heartBeat(ThreadPoolExecutor pool) {
+ // TODO
+ }
+
+ /**
* Add another connection for this file transfer. Currently only one
* connection is allowed, but this might change in the future.
*
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
new file mode 100644
index 00000000..82c4ab9e
--- /dev/null
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
@@ -0,0 +1,132 @@
+package org.openslx.bwlp.sat.fileserv;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.openslx.bwlp.sat.util.Constants;
+import org.openslx.bwlp.sat.util.Formatter;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
+import org.openslx.bwlp.thrift.iface.InvocationError;
+import org.openslx.bwlp.thrift.iface.TAuthorizationException;
+import org.openslx.bwlp.thrift.iface.TInvocationException;
+import org.openslx.bwlp.thrift.iface.TNotFoundException;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+import org.openslx.thrifthelper.ThriftManager;
+import org.openslx.util.QuickTimer;
+import org.openslx.util.QuickTimer.Task;
+
+/**
+ * Manages file transfers between this satellite and the master server.
+ */
+public class SyncTransferHandler {
+
+ private static final Logger LOGGER = Logger.getLogger(SyncTransferHandler.class);
+
+ private static final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1,
+ Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES,
+ new ArrayBlockingQueue<Runnable>(1));
+
+ /**
+ * All currently running uploads, indexed by token
+ */
+ private static final Map<String, IncomingDataTransfer> downloads = new ConcurrentHashMap<>();
+
+ /**
+ * All currently running downloads, indexed by token
+ */
+ private static final Map<String, OutgoingDataTransfer> uploads = new ConcurrentHashMap<>();
+
+ private static Task heartBeatTask = new Task() {
+ private final Runnable worker = new Runnable() {
+ @Override
+ public void run() {
+ for (IncomingDataTransfer download : downloads.values()) {
+ if (download.isActive())
+ download.heartBeat(transferPool);
+ }
+ for (OutgoingDataTransfer upload : uploads.values()) {
+ if (upload.isActive())
+ upload.heartBeat(transferPool);
+ }
+ }
+ };
+
+ @Override
+ public void fire() {
+ if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1)
+ return;
+ transferPool.execute(worker);
+ }
+ };
+
+ //
+
+ static {
+ QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56));
+ }
+
+ public synchronized static String requestImageDownload(ImagePublishData image)
+ throws TInvocationException, TAuthorizationException, TNotFoundException {
+ TransferInformation transferInfo;
+ // Already replicating this one?
+ if (downloads.containsKey(image.imageVersionId))
+ return image.imageVersionId;
+ checkDownloadCount();
+ try {
+ transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId);
+ } catch (TAuthorizationException e) {
+ LOGGER.warn("Master server rejected our session on downloadImage", e);
+ throw e;
+ } catch (TNotFoundException e) {
+ LOGGER.warn("Master server couldn't find image on downloadImage", e);
+ throw e;
+ } catch (TException e) {
+ LOGGER.warn("Master server made a boo-boo on downloadImage", e);
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Communication with master server failed");
+ }
+ File tmpFile = null;
+ do {
+ tmpFile = Formatter.getTempImageName();
+ } while (tmpFile.exists());
+ tmpFile.getParentFile().mkdirs();
+ try {
+ IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo);
+ downloads.put(transfer.getId(), transfer);
+ return transfer.getId();
+ } catch (FileNotFoundException e) {
+ LOGGER.warn("Could not open " + tmpFile.getAbsolutePath());
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Could not access local file for writing");
+ }
+ }
+
+ private static void checkDownloadCount() throws TInvocationException {
+ Iterator<IncomingDataTransfer> it = downloads.values().iterator();
+ final long now = System.currentTimeMillis();
+ int activeDownloads = 0;
+ while (it.hasNext()) {
+ IncomingDataTransfer upload = it.next();
+ if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) {
+ upload.cancel();
+ it.remove();
+ continue;
+ }
+ activeDownloads++;
+ }
+ if (activeDownloads > Constants.MAX_MASTER_DOWNLOADS) {
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Server busy. Too many running downloads (" + activeDownloads + "/"
+ + Constants.MAX_MASTER_DOWNLOADS + ").");
+ }
+ }
+
+}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java
index 29fa5613..0fc8a365 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java
@@ -331,6 +331,27 @@ public class User {
"Only the super user can change the expire date of images");
}
+ public static void canTriggerReplicationOrFail(UserInfo user, String imageVersionId)
+ throws TAuthorizationException, TInvocationException {
+ if (isTutor(user)) {
+ ImageSummaryRead image;
+ try {
+ image = getImageFromVersionId(user, imageVersionId);
+ } catch (TNotFoundException e) {
+ // If the image is not known locally, allow replication
+ return;
+ }
+ // If it's a remote image, or if the user has edit permissions, allow
+ if (image.shareMode == ShareMode.DOWNLOAD || image.shareMode == ShareMode.FROZEN
+ || image.userPermissions.edit)
+ return;
+ throw new TAuthorizationException(AuthorizationError.NO_PERMISSION,
+ "You cannot trigger downloading an image to the satellite server that is not in replication mode");
+ }
+ throw new TAuthorizationException(AuthorizationError.NO_PERMISSION,
+ "Only tutors can trigger image replication");
+ }
+
public static void setCombinedUserPermissions(ImageSummaryRead image, UserInfo user) {
if (hasAllImagePermissions(user, image.ownerId)) {
image.userPermissions = imageSu;
@@ -435,6 +456,15 @@ public class User {
}
}
+ private static ImageSummaryRead getImageFromVersionId(UserInfo user, String imageVersionId)
+ throws TNotFoundException, TInvocationException {
+ try {
+ return DbImage.getImageSummary(user, DbImage.getBaseIdForVersionId(imageVersionId));
+ } catch (SQLException e) {
+ throw new TInvocationException();
+ }
+ }
+
private static LectureSummary getLectureFromId(UserInfo user, String lectureId)
throws TNotFoundException, TInvocationException {
try {
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java
index 533c8acf..3b76efcd 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java
@@ -2,7 +2,6 @@ package org.openslx.bwlp.sat.thrift;
import java.nio.ByteBuffer;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -15,9 +14,10 @@ import org.openslx.bwlp.sat.database.mappers.DbLecture;
import org.openslx.bwlp.sat.database.mappers.DbLecturePermissions;
import org.openslx.bwlp.sat.database.mappers.DbUser;
import org.openslx.bwlp.sat.database.models.LocalUser;
-import org.openslx.bwlp.sat.fileserv.ActiveDownload;
-import org.openslx.bwlp.sat.fileserv.ActiveUpload;
import org.openslx.bwlp.sat.fileserv.FileServer;
+import org.openslx.bwlp.sat.fileserv.IncomingDataTransfer;
+import org.openslx.bwlp.sat.fileserv.OutgoingDataTransfer;
+import org.openslx.bwlp.sat.fileserv.SyncTransferHandler;
import org.openslx.bwlp.sat.permissions.User;
import org.openslx.bwlp.sat.thrift.cache.OperatingSystemList;
import org.openslx.bwlp.sat.thrift.cache.OrganizationList;
@@ -29,6 +29,7 @@ import org.openslx.bwlp.thrift.iface.AuthorizationError;
import org.openslx.bwlp.thrift.iface.ImageBaseWrite;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePermissions;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageSummaryRead;
import org.openslx.bwlp.thrift.iface.ImageVersionDetails;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
@@ -56,6 +57,7 @@ import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.bwlp.thrift.iface.Virtualizer;
import org.openslx.bwlp.thrift.iface.WhoamiInfo;
import org.openslx.sat.thrift.version.Version;
+import org.openslx.thrifthelper.ThriftManager;
public class ServerHandler implements SatelliteServer.Iface {
@@ -77,18 +79,6 @@ public class ServerHandler implements SatelliteServer.Iface {
* File Transfer
*/
- private List<byte[]> unwrapHashes(List<ByteBuffer> blockHashes) {
- if (blockHashes == null || blockHashes.isEmpty())
- return null;
- List<byte[]> hashList = new ArrayList<>(blockHashes.size());
- for (ByteBuffer hash : blockHashes) {
- byte[] buffer = new byte[hash.remaining()];
- hash.get(buffer);
- hashList.add(buffer);
- }
- return hashList;
- }
-
@Override
public TransferInformation requestImageVersionUpload(String userToken, String imageBaseId, long fileSize,
List<ByteBuffer> blockHashes, ByteBuffer machineDescription) throws TTransferRejectedException,
@@ -101,31 +91,30 @@ public class ServerHandler implements SatelliteServer.Iface {
} catch (SQLException e) {
throw new TInvocationException();
}
+ if (image.shareMode != ShareMode.LOCAL && image.shareMode != ShareMode.PUBLISH)
+ throw new TAuthorizationException(AuthorizationError.NO_PERMISSION,
+ "Cannot upload new versions of a replicated image");
// Unwrap machine description
- byte[] mDesc = null;
- if (machineDescription != null) {
- mDesc = new byte[machineDescription.remaining()];
- machineDescription.get(mDesc);
- }
+ byte[] mDesc = ThriftUtil.unwrapByteBuffer(machineDescription);
// Unwrap sha1sum list
- List<byte[]> hashList = unwrapHashes(blockHashes);
- ActiveUpload transfer = fileServer.createNewUserUpload(user, image, fileSize, hashList, mDesc);
+ List<byte[]> hashList = ThriftUtil.unwrapByteBufferList(blockHashes);
+ IncomingDataTransfer transfer = fileServer.createNewUserUpload(user, image, fileSize, hashList, mDesc);
return new TransferInformation(transfer.getId(), fileServer.getPlainPort(), fileServer.getSslPort());
}
@Override
public void updateBlockHashes(String uploadToken, List<ByteBuffer> blockHashes)
throws TInvalidTokenException {
- ActiveUpload upload = fileServer.getUploadByToken(uploadToken);
+ IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken);
if (upload == null)
throw new TInvalidTokenException();
- List<byte[]> hashList = unwrapHashes(blockHashes);
+ List<byte[]> hashList = ThriftUtil.unwrapByteBufferList(blockHashes);
upload.updateBlockHashList(hashList);
}
@Override
public void cancelUpload(String uploadToken) {
- ActiveUpload upload = fileServer.getUploadByToken(uploadToken);
+ IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken);
if (upload != null)
upload.cancel();
@@ -133,7 +122,7 @@ public class ServerHandler implements SatelliteServer.Iface {
@Override
public TransferStatus queryUploadStatus(String uploadToken) throws TInvalidTokenException {
- ActiveUpload upload = fileServer.getUploadByToken(uploadToken);
+ IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken);
if (upload == null)
throw new TInvalidTokenException();
return upload.getStatus();
@@ -145,7 +134,7 @@ public class ServerHandler implements SatelliteServer.Iface {
TTransferRejectedException {
UserInfo user = SessionManager.getOrFail(userToken);
User.canDownloadImageVersionOrFail(user, imageVersionId);
- ActiveDownload transfer;
+ OutgoingDataTransfer transfer;
try {
transfer = fileServer.createNewUserDownload(DbImage.getLocalImageData(imageVersionId));
} catch (SQLException e) {
@@ -156,7 +145,7 @@ public class ServerHandler implements SatelliteServer.Iface {
@Override
public void cancelDownload(String downloadToken) {
- ActiveDownload download = fileServer.getDownloadByToken(downloadToken);
+ OutgoingDataTransfer download = fileServer.getDownloadByToken(downloadToken);
if (download != null)
download.cancel();
}
@@ -312,7 +301,7 @@ public class ServerHandler implements SatelliteServer.Iface {
throws TAuthorizationException, TInvocationException, TNotFoundException {
UserInfo user = SessionManager.getOrFail(userToken);
// Special case: Version is still being uploaded, so there's no entry yet - remember for later
- ActiveUpload upload = fileServer.getUploadByToken(imageVersionId);
+ IncomingDataTransfer upload = fileServer.getUploadByToken(imageVersionId);
if (upload != null && upload.setVersionData(user, image)) {
return;
}
@@ -421,8 +410,27 @@ public class ServerHandler implements SatelliteServer.Iface {
@Override
public String requestImageReplication(String userToken, String imageVersionId)
throws TAuthorizationException, TNotFoundException, TInvocationException {
- // TODO Auto-generated method stub
- return null;
+ UserInfo user = SessionManager.getOrFail(userToken);
+ User.canTriggerReplicationOrFail(user, imageVersionId);
+ // Query master server
+ ImagePublishData imagePublishData;
+ try {
+ imagePublishData = ThriftManager.getMasterClient().getImageData(userToken, imageVersionId);
+ } catch (TException e) {
+ LOGGER.error(
+ "Could not query image data from master server for an image that a client wants to replicate",
+ e);
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Cannot query master server for image information");
+ }
+ // Known by master server; now update/write to DB
+ try {
+ DbImage.writeBaseImage(user, imagePublishData);
+ } catch (SQLException e) {
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Could not write to local DB");
+ }
+ return SyncTransferHandler.requestImageDownload(imagePublishData);
}
@Override
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java
new file mode 100644
index 00000000..ee02f440
--- /dev/null
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java
@@ -0,0 +1,30 @@
+package org.openslx.bwlp.sat.thrift;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ThriftUtil {
+
+ public static List<byte[]> unwrapByteBufferList(List<ByteBuffer> blockHashes) {
+ if (blockHashes == null || blockHashes.isEmpty())
+ return null;
+ List<byte[]> hashList = new ArrayList<>(blockHashes.size());
+ for (ByteBuffer hash : blockHashes) {
+ byte[] buffer = new byte[hash.remaining()];
+ hash.get(buffer);
+ hashList.add(buffer);
+ }
+ return hashList;
+ }
+
+ public static byte[] unwrapByteBuffer(ByteBuffer buffer) {
+ byte[] byteArray = null;
+ if (buffer != null) {
+ byteArray = new byte[buffer.remaining()];
+ buffer.get(byteArray);
+ }
+ return byteArray;
+ }
+
+}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java
index b02d73d0..8afc6a8c 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java
@@ -20,6 +20,7 @@ public class Configuration {
private static String dbUri;
private static String dbUsername;
private static String dbPassword;
+ private static String masterAddress;
public static boolean load() throws IOException {
// Load configuration from java properties file
@@ -36,6 +37,7 @@ public class Configuration {
dbUri = prop.getProperty("db.uri");
dbUsername = prop.getProperty("db.username");
dbPassword = prop.getProperty("db.password");
+ masterAddress = prop.getProperty("master.address");
// Currently all fields are mandatory but there might be optional settings in the future
return vmStoreBasePath != null && dbUri != null && dbUsername != null && dbPassword != null;
@@ -69,6 +71,10 @@ public class Configuration {
return vmStoreProdPath;
}
+ public static String getMasterServerAddress() {
+ return masterAddress;
+ }
+
// Dynamically Computed fields
public static File getCurrentVmStorePath() {
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java
index 774d1c88..0af135ac 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java
@@ -1,22 +1,50 @@
package org.openslx.bwlp.sat.util;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
import org.openslx.filetransfer.util.FileChunk;
public class Constants {
+
+ private static final Logger LOGGER = Logger.getLogger(Constants.class);
+
public static final String INCOMPLETE_UPLOAD_SUFFIX = ".part";
public static final int MAX_UPLOADS;
public static final int MAX_DOWNLOADS;
+ public static final int MAX_MASTER_UPLOADS = 2;
+ public static final int MAX_MASTER_DOWNLOADS = 3;
public static final int TRANSFER_TIMEOUT = 15 * 1000; // 15s
static {
long maxMem = Runtime.getRuntime().maxMemory();
if (maxMem == Long.MAX_VALUE) {
// Apparently the JVM was started without a memory limit (no -Xmx cmdline),
- // so we assume a default of 512MB
- maxMem = 512l * 1024l * 1024l;
+ // so we try a dirty little trick by assuming this is linux and reading it
+ // from the /proc file system. If that fails too, assume a default of 512MB
+ try (BufferedReader br = new BufferedReader(new FileReader("/proc/meminfo"))) {
+ for (String line; (line = br.readLine()) != null;) {
+ if (line.startsWith("MemTotal:") && line.endsWith("kB")) {
+ String string = line.replaceAll("[^0-9]", "");
+ try {
+ maxMem = (Long.parseLong(string) / 2l) * 1024l;
+ LOGGER.debug("Guessing usable JVM memory via /proc/meminfo");
+ } catch (Exception e) {
+ }
+ break;
+ }
+ }
+ } catch (IOException e) {
+ }
+ if (maxMem == Long.MAX_VALUE) {
+ maxMem = 512l * 1024l * 1024l;
+ }
}
maxMem /= 1024l * 1024l;
// Now maxMem is the amount of memory in MiB
+ LOGGER.debug("Maximum JVM memory: " + maxMem + "MiB");
MAX_UPLOADS = (int) Math.max(1, (maxMem - 64) / (FileChunk.CHUNK_SIZE_MIB + 1));
MAX_DOWNLOADS = MAX_UPLOADS * 2;