diff options
author | Simon Rettberg | 2015-08-28 18:04:16 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-08-28 18:04:16 +0200 |
commit | 10f0687fe551bda88120c2dc2b003035dd9bbea8 (patch) | |
tree | a9a3103c5ca1981bad169a6a1527f0252c4bc76f /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv | |
parent | [client] save the selected download folder and not the generated folder (diff) | |
download | tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.gz tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.xz tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.zip |
[server] Working on image download from master server
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv')
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java | 34 | ||||
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java (renamed from dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java) | 104 | ||||
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java (renamed from dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java) | 16 | ||||
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java | 132 |
4 files changed, 260 insertions, 26 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index 6e8bcf1c..f821813f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -41,24 +41,26 @@ public class FileServer implements IncomingEvent { private final Listener sslListener; - private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(2, Constants.MAX_UPLOADS + private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, Constants.MAX_UPLOADS + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1)); /** * All currently running uploads, indexed by token */ - private final Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>(); + private final Map<String, IncomingDataTransfer> uploads = new ConcurrentHashMap<>(); /** * All currently running downloads, indexed by token */ - private final Map<String, ActiveDownload> downloads = new ConcurrentHashMap<>(); + private final Map<String, OutgoingDataTransfer> downloads = new ConcurrentHashMap<>(); private static final FileServer globalInstance = new FileServer(); private FileServer() { SSLContext ctx = Identity.getSSLContext(); sslListener = ctx == null ? null : new Listener(this, ctx, 9093, Constants.TRANSFER_TIMEOUT); + LOGGER.info("Max allowed concurrent uploads from clients: " + Constants.MAX_UPLOADS); + LOGGER.info("Max allowed concurrent downloads from clients: " + Constants.MAX_DOWNLOADS); } public static FileServer instance() { @@ -77,7 +79,7 @@ public class FileServer implements IncomingEvent { public void incomingDownloadRequest(Uploader uploader) throws IOException { String token = uploader.getToken(); LOGGER.info("Incoming filetransfer with token " + token); - ActiveDownload download = downloads.get(token); + OutgoingDataTransfer download = downloads.get(token); if (download == null) { LOGGER.warn("Unknown token " + token); uploader.cancel(); @@ -92,7 +94,7 @@ public class FileServer implements IncomingEvent { public void incomingUploadRequest(Downloader downloader) throws IOException { String token = downloader.getToken(); LOGGER.info("Incoming filetransfer with token " + token); - ActiveUpload upload = uploads.get(token); + IncomingDataTransfer upload = uploads.get(token); if (upload == null) { LOGGER.warn("Unknown token " + token); downloader.cancel(); @@ -109,25 +111,25 @@ public class FileServer implements IncomingEvent { * @param uploadToken * @return */ - public ActiveUpload getUploadByToken(String uploadToken) { + public IncomingDataTransfer getUploadByToken(String uploadToken) { if (uploadToken == null) return null; return uploads.get(uploadToken); } - public ActiveDownload getDownloadByToken(String downloadToken) { + public OutgoingDataTransfer getDownloadByToken(String downloadToken) { if (downloadToken == null) return null; return downloads.get(downloadToken); } - public ActiveUpload createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, + public IncomingDataTransfer createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws TTransferRejectedException { - Iterator<ActiveUpload> it = uploads.values().iterator(); + Iterator<IncomingDataTransfer> it = uploads.values().iterator(); final long now = System.currentTimeMillis(); int activeUploads = 0; while (it.hasNext()) { - ActiveUpload upload = it.next(); + IncomingDataTransfer upload = it.next(); if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) { upload.cancel(); it.remove(); @@ -146,9 +148,9 @@ public class FileServer implements IncomingEvent { destinationFile.getParentFile().mkdirs(); String key = UUID.randomUUID().toString(); - ActiveUpload upload; + IncomingDataTransfer upload; try { - upload = new ActiveUpload(key, owner, image, destinationFile, fileSize, sha1Sums, + upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums, machineDescription); } catch (FileNotFoundException e) { LOGGER.error("Could not open destination file for writing", e); @@ -171,13 +173,13 @@ public class FileServer implements IncomingEvent { return sslListener.getPort(); } - public ActiveDownload createNewUserDownload(LocalImageVersion localImageData) + public OutgoingDataTransfer createNewUserDownload(LocalImageVersion localImageData) throws TTransferRejectedException { - Iterator<ActiveDownload> it = downloads.values().iterator(); + Iterator<OutgoingDataTransfer> it = downloads.values().iterator(); final long now = System.currentTimeMillis(); int activeDownloads = 0; while (it.hasNext()) { - ActiveDownload download = it.next(); + OutgoingDataTransfer download = it.next(); if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) { download.cancel(); it.remove(); @@ -215,7 +217,7 @@ public class FileServer implements IncomingEvent { throw new TTransferRejectedException(errorMessage); } String key = UUID.randomUUID().toString(); - ActiveDownload transfer = new ActiveDownload(key, srcFile); + OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile); downloads.put(key, transfer); return transfer; } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java index 16f85fc1..0ca5d9b6 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -4,6 +4,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.sql.SQLException; import java.util.ArrayList; @@ -11,15 +12,20 @@ import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLContext; + import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.thrift.ThriftUtil; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; +import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.bwlp.thrift.iface.UserInfo; @@ -33,9 +39,9 @@ import org.openslx.filetransfer.util.HashChecker; import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; import org.openslx.filetransfer.util.HashChecker.HashResult; -public class ActiveUpload extends AbstractTransfer implements HashCheckCallback { +public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback { - private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); + private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class); /** * How many concurrent connections per upload @@ -45,10 +51,10 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback /** * Self reference for inner classes. */ - private final ActiveUpload activeUpload = this; + private final IncomingDataTransfer activeUpload = this; /** - * This is an active upload, so on our end, we have a Downloader. + * Remote peer is uploading, so on our end, we have Downloaders */ private List<Downloader> downloads = new ArrayList<>(); @@ -96,6 +102,11 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback */ private boolean fileWritable = true; + /** + * Set if this is a download from the master server + */ + private final TransferInformation masterTransferInfo; + private static final HashChecker hashChecker; static { @@ -108,8 +119,9 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback hashChecker = hc; } - public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, - long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws FileNotFoundException { + public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image, + File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) + throws FileNotFoundException { super(uploadId); this.tmpFileName = destinationFile; this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw"); @@ -118,6 +130,76 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback this.image = image; this.fileSize = fileSize; this.machineDescription = machineDescription; + this.masterTransferInfo = null; + } + + public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo) + throws FileNotFoundException { + super(publishData.imageVersionId); + ImageDetailsRead idr = new ImageDetailsRead(); + idr.setCreateTime(publishData.createTime); + idr.setDescription(publishData.description); + idr.setImageBaseId(publishData.imageBaseId); + idr.setImageName(publishData.imageName); + idr.setIsTemplate(publishData.isTemplate); + idr.setLatestVersionId(publishData.imageVersionId); + idr.setOsId(publishData.osId); + idr.setOwnerId(publishData.user.userId); + idr.setTags(publishData.tags); + idr.setUpdaterId(publishData.user.userId); + idr.setUpdateTime(publishData.createTime); + idr.setVirtId(publishData.virtId); + this.tmpFileName = tmpFile; + this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw"); + this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null + : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); + this.owner = publishData.user; + this.image = idr; + this.fileSize = publishData.fileSize; + this.machineDescription = transferInfo.machineDescription.getBytes(StandardCharsets.UTF_8); + this.masterTransferInfo = transferInfo; + this.versionSettings = new ImageVersionWrite(false); + } + + /** + * Called periodically if this is a transfer from the master server, so we + * can make sure the transfer is running. + */ + public void heartBeat(ThreadPoolExecutor pool) { + if (masterTransferInfo == null) + return; + synchronized (this) { + synchronized (downloads) { + if (downloads.size() >= 1) // TODO What to pick here? + return; + } + Downloader downloader = null; + if (masterTransferInfo.plainPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null, + masterTransferInfo.token); + } catch (Exception e1) { + LOGGER.debug("Plain connect failed", e1); + downloader = null; + } + } + if (downloader == null && masterTransferInfo.sslPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready + masterTransferInfo.token); + } catch (Exception e2) { + LOGGER.debug("SSL connect failed", e2); + downloader = null; + } + } + if (downloader == null) { + LOGGER.warn("Could not connect to master server for downloading " + image.imageName); + return; + } + addConnection(downloader, pool); + } } /** @@ -442,4 +524,14 @@ public class ActiveUpload extends AbstractTransfer implements HashCheckCallback return downloads.size(); } + @Override + protected void finalize() { + try { + Util.safeClose(tmpFileHandle); + if (tmpFileName.exists()) + tmpFileName.delete(); + } catch (Throwable t) { + } + } + } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java index 504317f3..e7c6715f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java @@ -9,9 +9,9 @@ import org.apache.log4j.Logger; import org.openslx.bwlp.sat.util.Constants; import org.openslx.filetransfer.Uploader; -public class ActiveDownload extends AbstractTransfer { +public class OutgoingDataTransfer extends AbstractTransfer { - private static final Logger LOGGER = Logger.getLogger(ActiveDownload.class); + private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class); /** * How many concurrent connections per download @@ -19,7 +19,7 @@ public class ActiveDownload extends AbstractTransfer { private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_DOWNLOADS / 4, 1); /** - * This is a download, so we have uploaders + * Remote peer is downloading, so we have Uploaders */ private List<Uploader> uploads = new ArrayList<>(); @@ -27,12 +27,20 @@ public class ActiveDownload extends AbstractTransfer { private boolean isCanceled = false; - public ActiveDownload(String uuid, File file) { + public OutgoingDataTransfer(String uuid, File file) { super(uuid); this.sourceFile = file; } /** + * Called periodically if this is a transfer from the master server, so we + * can make sure the transfer is running. + */ + public void heartBeat(ThreadPoolExecutor pool) { + // TODO + } + + /** * Add another connection for this file transfer. Currently only one * connection is allowed, but this might change in the future. * diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java new file mode 100644 index 00000000..82c4ab9e --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -0,0 +1,132 @@ +package org.openslx.bwlp.sat.fileserv; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.InvocationError; +import org.openslx.bwlp.thrift.iface.TAuthorizationException; +import org.openslx.bwlp.thrift.iface.TInvocationException; +import org.openslx.bwlp.thrift.iface.TNotFoundException; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; + +/** + * Manages file transfers between this satellite and the master server. + */ +public class SyncTransferHandler { + + private static final Logger LOGGER = Logger.getLogger(SyncTransferHandler.class); + + private static final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, + Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<Runnable>(1)); + + /** + * All currently running uploads, indexed by token + */ + private static final Map<String, IncomingDataTransfer> downloads = new ConcurrentHashMap<>(); + + /** + * All currently running downloads, indexed by token + */ + private static final Map<String, OutgoingDataTransfer> uploads = new ConcurrentHashMap<>(); + + private static Task heartBeatTask = new Task() { + private final Runnable worker = new Runnable() { + @Override + public void run() { + for (IncomingDataTransfer download : downloads.values()) { + if (download.isActive()) + download.heartBeat(transferPool); + } + for (OutgoingDataTransfer upload : uploads.values()) { + if (upload.isActive()) + upload.heartBeat(transferPool); + } + } + }; + + @Override + public void fire() { + if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1) + return; + transferPool.execute(worker); + } + }; + + // + + static { + QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56)); + } + + public synchronized static String requestImageDownload(ImagePublishData image) + throws TInvocationException, TAuthorizationException, TNotFoundException { + TransferInformation transferInfo; + // Already replicating this one? + if (downloads.containsKey(image.imageVersionId)) + return image.imageVersionId; + checkDownloadCount(); + try { + transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId); + } catch (TAuthorizationException e) { + LOGGER.warn("Master server rejected our session on downloadImage", e); + throw e; + } catch (TNotFoundException e) { + LOGGER.warn("Master server couldn't find image on downloadImage", e); + throw e; + } catch (TException e) { + LOGGER.warn("Master server made a boo-boo on downloadImage", e); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Communication with master server failed"); + } + File tmpFile = null; + do { + tmpFile = Formatter.getTempImageName(); + } while (tmpFile.exists()); + tmpFile.getParentFile().mkdirs(); + try { + IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo); + downloads.put(transfer.getId(), transfer); + return transfer.getId(); + } catch (FileNotFoundException e) { + LOGGER.warn("Could not open " + tmpFile.getAbsolutePath()); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Could not access local file for writing"); + } + } + + private static void checkDownloadCount() throws TInvocationException { + Iterator<IncomingDataTransfer> it = downloads.values().iterator(); + final long now = System.currentTimeMillis(); + int activeDownloads = 0; + while (it.hasNext()) { + IncomingDataTransfer upload = it.next(); + if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) { + upload.cancel(); + it.remove(); + continue; + } + activeDownloads++; + } + if (activeDownloads > Constants.MAX_MASTER_DOWNLOADS) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Server busy. Too many running downloads (" + activeDownloads + "/" + + Constants.MAX_MASTER_DOWNLOADS + ")."); + } + } + +} |