diff options
author | Simon Rettberg | 2015-08-28 18:04:16 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-08-28 18:04:16 +0200 |
commit | 10f0687fe551bda88120c2dc2b003035dd9bbea8 (patch) | |
tree | a9a3103c5ca1981bad169a6a1527f0252c4bc76f /dozentenmodulserver/src/main/java/org | |
parent | [client] save the selected download folder and not the generated folder (diff) | |
download | tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.gz tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.xz tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.zip |
[server] Working on image download from master server
Diffstat (limited to 'dozentenmodulserver/src/main/java/org')
11 files changed, 441 insertions, 61 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java index 62000490..86d231ef 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java @@ -82,8 +82,8 @@ public class App { } }); - ThriftManager.setMasterServerAddress(SSLContext.getDefault(), - "bwlp-masterserver.ruf.uni-freiburg.de", 9091, 30000); + ThriftManager.setMasterServerAddress(SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready + Configuration.getMasterServerAddress(), 9091, 30000); // Load useful things from master server OrganizationList.get(); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java index 2e2393f8..c03d8322 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java @@ -21,6 +21,7 @@ import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageBaseWrite; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePermissions; +import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageSummaryRead; import org.openslx.bwlp.thrift.iface.ImageVersionDetails; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; @@ -283,6 +284,49 @@ public class DbImage { } } + /** + * Create or update a base image with the given publish data. + * Used for replication from master server. + * + * @param user The user who triggered the download, and will be considered + * the creator; if null, the creator of the image will be used + * @param image The image to create + * @throws SQLException + */ + public static void writeBaseImage(UserInfo user, ImagePublishData image) throws SQLException { + if (user == null) { + user = image.user; + } + try (MysqlConnection connection = Database.getConnection()) { + MysqlStatement stmt = connection.prepareStatement("INSERT INTO imagebase" + + " (imagebaseid, displayname, description, osid, virtid, createtime," + + " updatetime, ownerid, updaterid, sharemode, istemplate," + + " canlinkdefault, candownloaddefault, caneditdefault, canadmindefault)" + + " VALUES " + + " (:imagebaseid, :displayname, :description, :osid, :virtid, :unixtime," + + " :unixtime, :userid, :userid, :sharemode, :istemplate," + + " 1, 1, 0, 0) " + + " ON DUPLICATE KEY UPDATE " + + " displayname = VALUES(displayname), description = VALUES(description)," + + " osid = VALUES(osid), virtid = VALUES(virtid), updatetime = VALUES(updatetime)," + + " updaterid = VALUES(updaterid), istemplate = VALUES(istemplate)"); + stmt.setString("imagebaseid", image.imageBaseId); + stmt.setString("displayname", image.imageName); + stmt.setString("description", image.description); + stmt.setInt("osid", image.osId); + stmt.setString("virtid", image.virtId); + stmt.setLong("unixtime", Util.unixTime()); + stmt.setString("userid", user.userId); + stmt.setString("sharemode", "DOWNLOAD"); + stmt.setBoolean("istemplate", image.isTemplate); + stmt.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + LOGGER.error("Query failed in DbImage.writeBaseImage()", e); + throw e; + } + } + public static void updateImageMetadata(UserInfo user, String imageBaseId, ImageBaseWrite image) throws SQLException { if (image.imageName.length() > 100) { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index 6e8bcf1c..f821813f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -41,24 +41,26 @@ public class FileServer implements IncomingEvent { private final Listener sslListener; - private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(2, Constants.MAX_UPLOADS + private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, Constants.MAX_UPLOADS + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1)); /** * All currently running uploads, indexed by token */ - private final Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>(); + private final Map<String, IncomingDataTransfer> uploads = new ConcurrentHashMap<>(); /** * All currently running downloads, indexed by token */ - private final Map<String, ActiveDownload> downloads = new ConcurrentHashMap<>(); + private final Map<String, OutgoingDataTransfer> downloads = new ConcurrentHashMap<>(); private static final FileServer globalInstance = new FileServer(); private FileServer() { SSLContext ctx = Identity.getSSLContext(); sslListener = ctx == null ? null : new Listener(this, ctx, 9093, Constants.TRANSFER_TIMEOUT); + LOGGER.info("Max allowed concurrent uploads from clients: " + Constants.MAX_UPLOADS); + LOGGER.info("Max allowed concurrent downloads from clients: " + Constants.MAX_DOWNLOADS); } public static FileServer instance() { @@ -77,7 +79,7 @@ public class FileServer implements IncomingEvent { public void incomingDownloadRequest(Uploader uploader) throws IOException { String token = uploader.getToken(); LOGGER.info("Incoming filetransfer with token " + token); - ActiveDownload download = downloads.get(token); + OutgoingDataTransfer download = downloads.get(token); if (download == null) { LOGGER.warn("Unknown token " + token); uploader.cancel(); @@ -92,7 +94,7 @@ public class FileServer implements IncomingEvent { public void incomingUploadRequest(Downloader downloader) throws IOException { String token = downloader.getToken(); LOGGER.info("Incoming filetransfer with token " + token); - ActiveUpload upload = uploads.get(token); + IncomingDataTransfer upload = uploads.get(token); if (upload == null) { LOGGER.warn("Unknown token " + token); downloader.cancel(); @@ -109,25 +111,25 @@ public class FileServer implements IncomingEvent { * @param uploadToken * @return */ - public ActiveUpload getUploadByToken(String uploadToken) { + public IncomingDataTransfer getUploadByToken(String uploadToken) { if (uploadToken == null) return null; return uploads.get(uploadToken); } - public ActiveDownload getDownloadByToken(String downloadToken) { + public OutgoingDataTransfer getDownloadByToken(String downloadToken) { if (downloadToken == null) return null; return downloads.get(downloadToken); } - public ActiveUpload createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, + public IncomingDataTransfer createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws TTransferRejectedException { - Iterator<ActiveUpload> it = uploads.values().iterator(); + Iterator<IncomingDataTransfer> it = uploads.values().iterator(); final long now = System.currentTimeMillis(); int activeUploads = 0; while (it.hasNext()) { - ActiveUpload upload = it.next(); + IncomingDataTransfer upload = it.next(); if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) { upload.cancel(); it.remove(); @@ -146,9 +148,9 @@ public class FileServer implements IncomingEvent { destinationFile.getParentFile().mkdirs(); String key = UUID.randomUUID().toString(); - ActiveUpload upload; + IncomingDataTransfer upload; try { - upload = new ActiveUpload(key, owner, image, destinationFile, fileSize, sha1Sums, + upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums, machineDescription); } catch (FileNotFoundException e) { LOGGER.error("Could not open destination file for writing", e); @@ -171,13 +173,13 @@ public class FileServer implements IncomingEvent { return sslListener.getPort(); } - public ActiveDownload createNewUserDownload(LocalImageVersion localImageData) + public OutgoingDataTransfer createNewUserDownload(LocalImageVersion localImageData) throws TTransferRejectedException { - Iterator<ActiveDownload> it = downloads.values().iterator(); + Iterator<OutgoingDataTransfer> it = downloads.values().iterator(); final long now = System.currentTimeMillis(); int activeDownloads = 0; while (it.hasNext()) { - ActiveDownload download = it.next(); + OutgoingDataTransfer download = it.next(); if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) { download.cancel(); it.remove(); @@ -215,7 +217,7 @@ public class FileServer implements IncomingEvent { throw new TTransferRejectedException(errorMessage); } String key = UUID.randomUUID().toString(); - ActiveDownload transfer = new ActiveDownload(key, srcFile); + OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile); downloads.put(key, transfer); return transfer; } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java index 16f85fc1..0ca5d9b6 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -4,6 +4,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.sql.SQLException; import java.util.ArrayList; @@ -11,15 +12,20 @@ import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLContext; + import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.thrift.ThriftUtil; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; +import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.bwlp.thrift.iface.UserInfo; @@ -33,9 +39,9 @@ import org.openslx.filetransfer.util.HashChecker; import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; import org.openslx.filetransfer.util.HashChecker.HashResult; -public class ActiveUpload extends AbstractTransfer implements HashCheckCallback { +public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback { - private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); + private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class); /** * How many concurrent connections per upload @@ -45,10 +51,10 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback /** * Self reference for inner classes. */ - private final ActiveUpload activeUpload = this; + private final IncomingDataTransfer activeUpload = this; /** - * This is an active upload, so on our end, we have a Downloader. + * Remote peer is uploading, so on our end, we have Downloaders */ private List<Downloader> downloads = new ArrayList<>(); @@ -96,6 +102,11 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback */ private boolean fileWritable = true; + /** + * Set if this is a download from the master server + */ + private final TransferInformation masterTransferInfo; + private static final HashChecker hashChecker; static { @@ -108,8 +119,9 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback hashChecker = hc; } - public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, - long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws FileNotFoundException { + public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image, + File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) + throws FileNotFoundException { super(uploadId); this.tmpFileName = destinationFile; this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw"); @@ -118,6 +130,76 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback this.image = image; this.fileSize = fileSize; this.machineDescription = machineDescription; + this.masterTransferInfo = null; + } + + public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo) + throws FileNotFoundException { + super(publishData.imageVersionId); + ImageDetailsRead idr = new ImageDetailsRead(); + idr.setCreateTime(publishData.createTime); + idr.setDescription(publishData.description); + idr.setImageBaseId(publishData.imageBaseId); + idr.setImageName(publishData.imageName); + idr.setIsTemplate(publishData.isTemplate); + idr.setLatestVersionId(publishData.imageVersionId); + idr.setOsId(publishData.osId); + idr.setOwnerId(publishData.user.userId); + idr.setTags(publishData.tags); + idr.setUpdaterId(publishData.user.userId); + idr.setUpdateTime(publishData.createTime); + idr.setVirtId(publishData.virtId); + this.tmpFileName = tmpFile; + this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw"); + this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null + : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); + this.owner = publishData.user; + this.image = idr; + this.fileSize = publishData.fileSize; + this.machineDescription = transferInfo.machineDescription.getBytes(StandardCharsets.UTF_8); + this.masterTransferInfo = transferInfo; + this.versionSettings = new ImageVersionWrite(false); + } + + /** + * Called periodically if this is a transfer from the master server, so we + * can make sure the transfer is running. + */ + public void heartBeat(ThreadPoolExecutor pool) { + if (masterTransferInfo == null) + return; + synchronized (this) { + synchronized (downloads) { + if (downloads.size() >= 1) // TODO What to pick here? + return; + } + Downloader downloader = null; + if (masterTransferInfo.plainPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null, + masterTransferInfo.token); + } catch (Exception e1) { + LOGGER.debug("Plain connect failed", e1); + downloader = null; + } + } + if (downloader == null && masterTransferInfo.sslPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready + masterTransferInfo.token); + } catch (Exception e2) { + LOGGER.debug("SSL connect failed", e2); + downloader = null; + } + } + if (downloader == null) { + LOGGER.warn("Could not connect to master server for downloading " + image.imageName); + return; + } + addConnection(downloader, pool); + } } /** @@ -442,4 +524,14 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback return downloads.size(); } + @Override + protected void finalize() { + try { + Util.safeClose(tmpFileHandle); + if (tmpFileName.exists()) + tmpFileName.delete(); + } catch (Throwable t) { + } + } + } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java index 504317f3..e7c6715f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java @@ -9,9 +9,9 @@ import org.apache.log4j.Logger; import org.openslx.bwlp.sat.util.Constants; import org.openslx.filetransfer.Uploader; -public class ActiveDownload extends AbstractTransfer { +public class OutgoingDataTransfer extends AbstractTransfer { - private static final Logger LOGGER = Logger.getLogger(ActiveDownload.class); + private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class); /** * How many concurrent connections per download @@ -19,7 +19,7 @@ public class ActiveDownload extends AbstractTransfer { private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_DOWNLOADS / 4, 1); /** - * This is a download, so we have uploaders + * Remote peer is downloading, so we have Uploaders */ private List<Uploader> uploads = new ArrayList<>(); @@ -27,12 +27,20 @@ public class ActiveDownload extends AbstractTransfer { private boolean isCanceled = false; - public ActiveDownload(String uuid, File file) { + public OutgoingDataTransfer(String uuid, File file) { super(uuid); this.sourceFile = file; } /** + * Called periodically if this is a transfer from the master server, so we + * can make sure the transfer is running. + */ + public void heartBeat(ThreadPoolExecutor pool) { + // TODO + } + + /** * Add another connection for this file transfer. Currently only one * connection is allowed, but this might change in the future. * diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java new file mode 100644 index 00000000..82c4ab9e --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -0,0 +1,132 @@ +package org.openslx.bwlp.sat.fileserv; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.InvocationError; +import org.openslx.bwlp.thrift.iface.TAuthorizationException; +import org.openslx.bwlp.thrift.iface.TInvocationException; +import org.openslx.bwlp.thrift.iface.TNotFoundException; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; + +/** + * Manages file transfers between this satellite and the master server. + */ +public class SyncTransferHandler { + + private static final Logger LOGGER = Logger.getLogger(SyncTransferHandler.class); + + private static final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, + Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<Runnable>(1)); + + /** + * All currently running uploads, indexed by token + */ + private static final Map<String, IncomingDataTransfer> downloads = new ConcurrentHashMap<>(); + + /** + * All currently running downloads, indexed by token + */ + private static final Map<String, OutgoingDataTransfer> uploads = new ConcurrentHashMap<>(); + + private static Task heartBeatTask = new Task() { + private final Runnable worker = new Runnable() { + @Override + public void run() { + for (IncomingDataTransfer download : downloads.values()) { + if (download.isActive()) + download.heartBeat(transferPool); + } + for (OutgoingDataTransfer upload : uploads.values()) { + if (upload.isActive()) + upload.heartBeat(transferPool); + } + } + }; + + @Override + public void fire() { + if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1) + return; + transferPool.execute(worker); + } + }; + + // + + static { + QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56)); + } + + public synchronized static String requestImageDownload(ImagePublishData image) + throws TInvocationException, TAuthorizationException, TNotFoundException { + TransferInformation transferInfo; + // Already replicating this one? + if (downloads.containsKey(image.imageVersionId)) + return image.imageVersionId; + checkDownloadCount(); + try { + transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId); + } catch (TAuthorizationException e) { + LOGGER.warn("Master server rejected our session on downloadImage", e); + throw e; + } catch (TNotFoundException e) { + LOGGER.warn("Master server couldn't find image on downloadImage", e); + throw e; + } catch (TException e) { + LOGGER.warn("Master server made a boo-boo on downloadImage", e); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Communication with master server failed"); + } + File tmpFile = null; + do { + tmpFile = Formatter.getTempImageName(); + } while (tmpFile.exists()); + tmpFile.getParentFile().mkdirs(); + try { + IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo); + downloads.put(transfer.getId(), transfer); + return transfer.getId(); + } catch (FileNotFoundException e) { + LOGGER.warn("Could not open " + tmpFile.getAbsolutePath()); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Could not access local file for writing"); + } + } + + private static void checkDownloadCount() throws TInvocationException { + Iterator<IncomingDataTransfer> it = downloads.values().iterator(); + final long now = System.currentTimeMillis(); + int activeDownloads = 0; + while (it.hasNext()) { + IncomingDataTransfer upload = it.next(); + if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) { + upload.cancel(); + it.remove(); + continue; + } + activeDownloads++; + } + if (activeDownloads > Constants.MAX_MASTER_DOWNLOADS) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Server busy. Too many running downloads (" + activeDownloads + "/" + + Constants.MAX_MASTER_DOWNLOADS + ")."); + } + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java index 29fa5613..0fc8a365 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java @@ -331,6 +331,27 @@ public class User { "Only the super user can change the expire date of images"); } + public static void canTriggerReplicationOrFail(UserInfo user, String imageVersionId) + throws TAuthorizationException, TInvocationException { + if (isTutor(user)) { + ImageSummaryRead image; + try { + image = getImageFromVersionId(user, imageVersionId); + } catch (TNotFoundException e) { + // If the image is not known locally, allow replication + return; + } + // If it's a remote image, or if the user has edit permissions, allow + if (image.shareMode == ShareMode.DOWNLOAD || image.shareMode == ShareMode.FROZEN + || image.userPermissions.edit) + return; + throw new TAuthorizationException(AuthorizationError.NO_PERMISSION, + "You cannot trigger downloading an image to the satellite server that is not in replication mode"); + } + throw new TAuthorizationException(AuthorizationError.NO_PERMISSION, + "Only tutors can trigger image replication"); + } + public static void setCombinedUserPermissions(ImageSummaryRead image, UserInfo user) { if (hasAllImagePermissions(user, image.ownerId)) { image.userPermissions = imageSu; @@ -435,6 +456,15 @@ public class User { } } + private static ImageSummaryRead getImageFromVersionId(UserInfo user, String imageVersionId) + throws TNotFoundException, TInvocationException { + try { + return DbImage.getImageSummary(user, DbImage.getBaseIdForVersionId(imageVersionId)); + } catch (SQLException e) { + throw new TInvocationException(); + } + } + private static LectureSummary getLectureFromId(UserInfo user, String lectureId) throws TNotFoundException, TInvocationException { try { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java index 533c8acf..3b76efcd 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java @@ -2,7 +2,6 @@ package org.openslx.bwlp.sat.thrift; import java.nio.ByteBuffer; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -15,9 +14,10 @@ import org.openslx.bwlp.sat.database.mappers.DbLecture; import org.openslx.bwlp.sat.database.mappers.DbLecturePermissions; import org.openslx.bwlp.sat.database.mappers.DbUser; import org.openslx.bwlp.sat.database.models.LocalUser; -import org.openslx.bwlp.sat.fileserv.ActiveDownload; -import org.openslx.bwlp.sat.fileserv.ActiveUpload; import org.openslx.bwlp.sat.fileserv.FileServer; +import org.openslx.bwlp.sat.fileserv.IncomingDataTransfer; +import org.openslx.bwlp.sat.fileserv.OutgoingDataTransfer; +import org.openslx.bwlp.sat.fileserv.SyncTransferHandler; import org.openslx.bwlp.sat.permissions.User; import org.openslx.bwlp.sat.thrift.cache.OperatingSystemList; import org.openslx.bwlp.sat.thrift.cache.OrganizationList; @@ -29,6 +29,7 @@ import org.openslx.bwlp.thrift.iface.AuthorizationError; import org.openslx.bwlp.thrift.iface.ImageBaseWrite; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePermissions; +import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageSummaryRead; import org.openslx.bwlp.thrift.iface.ImageVersionDetails; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; @@ -56,6 +57,7 @@ import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.bwlp.thrift.iface.Virtualizer; import org.openslx.bwlp.thrift.iface.WhoamiInfo; import org.openslx.sat.thrift.version.Version; +import org.openslx.thrifthelper.ThriftManager; public class ServerHandler implements SatelliteServer.Iface { @@ -77,18 +79,6 @@ public class ServerHandler implements SatelliteServer.Iface { * File Transfer */ - private List<byte[]> unwrapHashes(List<ByteBuffer> blockHashes) { - if (blockHashes == null || blockHashes.isEmpty()) - return null; - List<byte[]> hashList = new ArrayList<>(blockHashes.size()); - for (ByteBuffer hash : blockHashes) { - byte[] buffer = new byte[hash.remaining()]; - hash.get(buffer); - hashList.add(buffer); - } - return hashList; - } - @Override public TransferInformation requestImageVersionUpload(String userToken, String imageBaseId, long fileSize, List<ByteBuffer> blockHashes, ByteBuffer machineDescription) throws TTransferRejectedException, @@ -101,31 +91,30 @@ public class ServerHandler implements SatelliteServer.Iface { } catch (SQLException e) { throw new TInvocationException(); } + if (image.shareMode != ShareMode.LOCAL && image.shareMode != ShareMode.PUBLISH) + throw new TAuthorizationException(AuthorizationError.NO_PERMISSION, + "Cannot upload new versions of a replicated image"); // Unwrap machine description - byte[] mDesc = null; - if (machineDescription != null) { - mDesc = new byte[machineDescription.remaining()]; - machineDescription.get(mDesc); - } + byte[] mDesc = ThriftUtil.unwrapByteBuffer(machineDescription); // Unwrap sha1sum list - List<byte[]> hashList = unwrapHashes(blockHashes); - ActiveUpload transfer = fileServer.createNewUserUpload(user, image, fileSize, hashList, mDesc); + List<byte[]> hashList = ThriftUtil.unwrapByteBufferList(blockHashes); + IncomingDataTransfer transfer = fileServer.createNewUserUpload(user, image, fileSize, hashList, mDesc); return new TransferInformation(transfer.getId(), fileServer.getPlainPort(), fileServer.getSslPort()); } @Override public void updateBlockHashes(String uploadToken, List<ByteBuffer> blockHashes) throws TInvalidTokenException { - ActiveUpload upload = fileServer.getUploadByToken(uploadToken); + IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken); if (upload == null) throw new TInvalidTokenException(); - List<byte[]> hashList = unwrapHashes(blockHashes); + List<byte[]> hashList = ThriftUtil.unwrapByteBufferList(blockHashes); upload.updateBlockHashList(hashList); } @Override public void cancelUpload(String uploadToken) { - ActiveUpload upload = fileServer.getUploadByToken(uploadToken); + IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken); if (upload != null) upload.cancel(); @@ -133,7 +122,7 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public TransferStatus queryUploadStatus(String uploadToken) throws TInvalidTokenException { - ActiveUpload upload = fileServer.getUploadByToken(uploadToken); + IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken); if (upload == null) throw new TInvalidTokenException(); return upload.getStatus(); @@ -145,7 +134,7 @@ public class ServerHandler implements SatelliteServer.Iface { TTransferRejectedException { UserInfo user = SessionManager.getOrFail(userToken); User.canDownloadImageVersionOrFail(user, imageVersionId); - ActiveDownload transfer; + OutgoingDataTransfer transfer; try { transfer = fileServer.createNewUserDownload(DbImage.getLocalImageData(imageVersionId)); } catch (SQLException e) { @@ -156,7 +145,7 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public void cancelDownload(String downloadToken) { - ActiveDownload download = fileServer.getDownloadByToken(downloadToken); + OutgoingDataTransfer download = fileServer.getDownloadByToken(downloadToken); if (download != null) download.cancel(); } @@ -312,7 +301,7 @@ public class ServerHandler implements SatelliteServer.Iface { throws TAuthorizationException, TInvocationException, TNotFoundException { UserInfo user = SessionManager.getOrFail(userToken); // Special case: Version is still being uploaded, so there's no entry yet - remember for later - ActiveUpload upload = fileServer.getUploadByToken(imageVersionId); + IncomingDataTransfer upload = fileServer.getUploadByToken(imageVersionId); if (upload != null && upload.setVersionData(user, image)) { return; } @@ -421,8 +410,27 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public String requestImageReplication(String userToken, String imageVersionId) throws TAuthorizationException, TNotFoundException, TInvocationException { - // TODO Auto-generated method stub - return null; + UserInfo user = SessionManager.getOrFail(userToken); + User.canTriggerReplicationOrFail(user, imageVersionId); + // Query master server + ImagePublishData imagePublishData; + try { + imagePublishData = ThriftManager.getMasterClient().getImageData(userToken, imageVersionId); + } catch (TException e) { + LOGGER.error( + "Could not query image data from master server for an image that a client wants to replicate", + e); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Cannot query master server for image information"); + } + // Known by master server; now update/write to DB + try { + DbImage.writeBaseImage(user, imagePublishData); + } catch (SQLException e) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Could not write to local DB"); + } + return SyncTransferHandler.requestImageDownload(imagePublishData); } @Override diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java new file mode 100644 index 00000000..ee02f440 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java @@ -0,0 +1,30 @@ +package org.openslx.bwlp.sat.thrift; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ThriftUtil { + + public static List<byte[]> unwrapByteBufferList(List<ByteBuffer> blockHashes) { + if (blockHashes == null || blockHashes.isEmpty()) + return null; + List<byte[]> hashList = new ArrayList<>(blockHashes.size()); + for (ByteBuffer hash : blockHashes) { + byte[] buffer = new byte[hash.remaining()]; + hash.get(buffer); + hashList.add(buffer); + } + return hashList; + } + + public static byte[] unwrapByteBuffer(ByteBuffer buffer) { + byte[] byteArray = null; + if (buffer != null) { + byteArray = new byte[buffer.remaining()]; + buffer.get(byteArray); + } + return byteArray; + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java index b02d73d0..8afc6a8c 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java @@ -20,6 +20,7 @@ public class Configuration { private static String dbUri; private static String dbUsername; private static String dbPassword; + private static String masterAddress; public static boolean load() throws IOException { // Load configuration from java properties file @@ -36,6 +37,7 @@ public class Configuration { dbUri = prop.getProperty("db.uri"); dbUsername = prop.getProperty("db.username"); dbPassword = prop.getProperty("db.password"); + masterAddress = prop.getProperty("master.address"); // Currently all fields are mandatory but there might be optional settings in the future return vmStoreBasePath != null && dbUri != null && dbUsername != null && dbPassword != null; @@ -69,6 +71,10 @@ public class Configuration { return vmStoreProdPath; } + public static String getMasterServerAddress() { + return masterAddress; + } + // Dynamically Computed fields public static File getCurrentVmStorePath() { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java index 774d1c88..0af135ac 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java @@ -1,22 +1,50 @@ package org.openslx.bwlp.sat.util; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; + +import org.apache.log4j.Logger; import org.openslx.filetransfer.util.FileChunk; public class Constants { + + private static final Logger LOGGER = Logger.getLogger(Constants.class); + public static final String INCOMPLETE_UPLOAD_SUFFIX = ".part"; public static final int MAX_UPLOADS; public static final int MAX_DOWNLOADS; + public static final int MAX_MASTER_UPLOADS = 2; + public static final int MAX_MASTER_DOWNLOADS = 3; public static final int TRANSFER_TIMEOUT = 15 * 1000; // 15s static { long maxMem = Runtime.getRuntime().maxMemory(); if (maxMem == Long.MAX_VALUE) { // Apparently the JVM was started without a memory limit (no -Xmx cmdline), - // so we assume a default of 512MB - maxMem = 512l * 1024l * 1024l; + // so we try a dirty little trick by assuming this is linux and reading it + // from the /proc file system. If that fails too, assume a default of 512MB + try (BufferedReader br = new BufferedReader(new FileReader("/proc/meminfo"))) { + for (String line; (line = br.readLine()) != null;) { + if (line.startsWith("MemTotal:") && line.endsWith("kB")) { + String string = line.replaceAll("[^0-9]", ""); + try { + maxMem = (Long.parseLong(string) / 2l) * 1024l; + LOGGER.debug("Guessing usable JVM memory via /proc/meminfo"); + } catch (Exception e) { + } + break; + } + } + } catch (IOException e) { + } + if (maxMem == Long.MAX_VALUE) { + maxMem = 512l * 1024l * 1024l; + } } maxMem /= 1024l * 1024l; // Now maxMem is the amount of memory in MiB + LOGGER.debug("Maximum JVM memory: " + maxMem + "MiB"); MAX_UPLOADS = (int) Math.max(1, (maxMem - 64) / (FileChunk.CHUNK_SIZE_MIB + 1)); MAX_DOWNLOADS = MAX_UPLOADS * 2; |