From 10f0687fe551bda88120c2dc2b003035dd9bbea8 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 28 Aug 2015 18:04:16 +0200 Subject: [server] Working on image download from master server --- .../src/main/java/org/openslx/bwlp/sat/App.java | 4 +- .../openslx/bwlp/sat/database/mappers/DbImage.java | 44 ++ .../openslx/bwlp/sat/fileserv/ActiveDownload.java | 94 ---- .../openslx/bwlp/sat/fileserv/ActiveUpload.java | 445 ----------------- .../org/openslx/bwlp/sat/fileserv/FileServer.java | 34 +- .../bwlp/sat/fileserv/IncomingDataTransfer.java | 537 +++++++++++++++++++++ .../bwlp/sat/fileserv/OutgoingDataTransfer.java | 102 ++++ .../bwlp/sat/fileserv/SyncTransferHandler.java | 132 +++++ .../org/openslx/bwlp/sat/permissions/User.java | 30 ++ .../org/openslx/bwlp/sat/thrift/ServerHandler.java | 70 +-- .../org/openslx/bwlp/sat/thrift/ThriftUtil.java | 30 ++ .../org/openslx/bwlp/sat/util/Configuration.java | 6 + .../java/org/openslx/bwlp/sat/util/Constants.java | 32 +- 13 files changed, 970 insertions(+), 590 deletions(-) delete mode 100644 dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java delete mode 100644 dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java create mode 100644 dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java create mode 100644 dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java create mode 100644 dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java create mode 100644 dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java (limited to 'dozentenmodulserver/src/main/java') diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java index 62000490..86d231ef 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java @@ -82,8 +82,8 @@ public class App { } }); - ThriftManager.setMasterServerAddress(SSLContext.getDefault(), - "bwlp-masterserver.ruf.uni-freiburg.de", 9091, 30000); + ThriftManager.setMasterServerAddress(SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready + Configuration.getMasterServerAddress(), 9091, 30000); // Load useful things from master server OrganizationList.get(); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java index 2e2393f8..c03d8322 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java @@ -21,6 +21,7 @@ import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageBaseWrite; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePermissions; +import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageSummaryRead; import org.openslx.bwlp.thrift.iface.ImageVersionDetails; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; @@ -283,6 +284,49 @@ public class DbImage { } } + /** + * Create or update a base image with the given publish data. + * Used for replication from master server. + * + * @param user The user who triggered the download, and will be considered + * the creator; if null, the creator of the image will be used + * @param image The image to create + * @throws SQLException + */ + public static void writeBaseImage(UserInfo user, ImagePublishData image) throws SQLException { + if (user == null) { + user = image.user; + } + try (MysqlConnection connection = Database.getConnection()) { + MysqlStatement stmt = connection.prepareStatement("INSERT INTO imagebase" + + " (imagebaseid, displayname, description, osid, virtid, createtime," + + " updatetime, ownerid, updaterid, sharemode, istemplate," + + " canlinkdefault, candownloaddefault, caneditdefault, canadmindefault)" + + " VALUES " + + " (:imagebaseid, :displayname, :description, :osid, :virtid, :unixtime," + + " :unixtime, :userid, :userid, :sharemode, :istemplate," + + " 1, 1, 0, 0) " + + " ON DUPLICATE KEY UPDATE " + + " displayname = VALUES(displayname), description = VALUES(description)," + + " osid = VALUES(osid), virtid = VALUES(virtid), updatetime = VALUES(updatetime)," + + " updaterid = VALUES(updaterid), istemplate = VALUES(istemplate)"); + stmt.setString("imagebaseid", image.imageBaseId); + stmt.setString("displayname", image.imageName); + stmt.setString("description", image.description); + stmt.setInt("osid", image.osId); + stmt.setString("virtid", image.virtId); + stmt.setLong("unixtime", Util.unixTime()); + stmt.setString("userid", user.userId); + stmt.setString("sharemode", "DOWNLOAD"); + stmt.setBoolean("istemplate", image.isTemplate); + stmt.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + LOGGER.error("Query failed in DbImage.writeBaseImage()", e); + throw e; + } + } + public static void updateImageMetadata(UserInfo user, String imageBaseId, ImageBaseWrite image) throws SQLException { if (image.imageName.length() > 100) { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java deleted file mode 100644 index 504317f3..00000000 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveDownload.java +++ /dev/null @@ -1,94 +0,0 @@ -package org.openslx.bwlp.sat.fileserv; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; - -import org.apache.log4j.Logger; -import org.openslx.bwlp.sat.util.Constants; -import org.openslx.filetransfer.Uploader; - -public class ActiveDownload extends AbstractTransfer { - - private static final Logger LOGGER = Logger.getLogger(ActiveDownload.class); - - /** - * How many concurrent connections per download - */ - private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_DOWNLOADS / 4, 1); - - /** - * This is a download, so we have uploaders - */ - private List uploads = new ArrayList<>(); - - private final File sourceFile; - - private boolean isCanceled = false; - - public ActiveDownload(String uuid, File file) { - super(uuid); - this.sourceFile = file; - } - - /** - * Add another connection for this file transfer. Currently only one - * connection is allowed, but this might change in the future. - * - * @param connection - * @return true if the connection is accepted, false if it should be - * discarded - */ - public synchronized boolean addConnection(final Uploader connection, ThreadPoolExecutor pool) { - if (isCanceled) - return false; - potentialFinishTime.set(0); - synchronized (uploads) { - if (uploads.size() > MAX_CONNECTIONS) - return false; - uploads.add(connection); - } - try { - pool.execute(new Runnable() { - @Override - public void run() { - potentialFinishTime.set(0); - boolean ret = connection.upload(sourceFile.getAbsolutePath()); - synchronized (uploads) { - uploads.remove(connection); - } - if (ret && uploads.isEmpty()) { - potentialFinishTime.set(System.currentTimeMillis()); - } - lastActivityTime.set(System.currentTimeMillis()); - } - }); - } catch (Exception e) { - LOGGER.warn("threadpool rejected the incoming file transfer", e); - return false; - } - return true; - } - - @Override - public synchronized void cancel() { - isCanceled = true; - synchronized (uploads) { - for (Uploader u : uploads) { - u.cancel(); - } - } - } - - @Override - public boolean isActive() { - return !isCanceled; - } - - @Override - public int getActiveConnectionCount() { - return uploads.size(); - } - -} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java deleted file mode 100644 index 16f85fc1..00000000 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java +++ /dev/null @@ -1,445 +0,0 @@ -package org.openslx.bwlp.sat.fileserv; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.security.NoSuchAlgorithmException; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.log4j.Logger; -import org.openslx.bwlp.sat.database.mappers.DbImage; -import org.openslx.bwlp.sat.util.Configuration; -import org.openslx.bwlp.sat.util.Constants; -import org.openslx.bwlp.sat.util.FileSystem; -import org.openslx.bwlp.sat.util.Formatter; -import org.openslx.bwlp.sat.util.Util; -import org.openslx.bwlp.thrift.iface.ImageDetailsRead; -import org.openslx.bwlp.thrift.iface.ImageVersionWrite; -import org.openslx.bwlp.thrift.iface.TransferState; -import org.openslx.bwlp.thrift.iface.TransferStatus; -import org.openslx.bwlp.thrift.iface.UserInfo; -import org.openslx.filetransfer.DataReceivedCallback; -import org.openslx.filetransfer.Downloader; -import org.openslx.filetransfer.FileRange; -import org.openslx.filetransfer.WantRangeCallback; -import org.openslx.filetransfer.util.ChunkList; -import org.openslx.filetransfer.util.FileChunk; -import org.openslx.filetransfer.util.HashChecker; -import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; -import org.openslx.filetransfer.util.HashChecker.HashResult; - -public class ActiveUpload extends AbstractTransfer implements HashCheckCallback { - - private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); - - /** - * How many concurrent connections per upload - */ - private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_UPLOADS / 4, 1); - - /** - * Self reference for inner classes. - */ - private final ActiveUpload activeUpload = this; - - /** - * This is an active upload, so on our end, we have a Downloader. - */ - private List downloads = new ArrayList<>(); - - private final File tmpFileName; - - private final RandomAccessFile tmpFileHandle; - - private final ChunkList chunks; - - private final long fileSize; - - /** - * User owning this uploaded file. - */ - private final UserInfo owner; - - /** - * Base image this upload is a new version for. - */ - private final ImageDetailsRead image; - - /** - * Flags to set for this new image version. Optional field. - */ - private ImageVersionWrite versionSettings = null; - - /** - * TransferState of this upload - */ - private TransferState state = TransferState.IDLE; - - /** - * Description of this VM - binary dump of e.g. the *.vmx file (VMware) - */ - private final byte[] machineDescription; - - /** - * Indicated whether the version information was written to db already. - * Disallow setVersionData in that case. - */ - private final AtomicBoolean versionWrittenToDb = new AtomicBoolean(); - - /** - * Whether file is (still) writable. Used for the file transfer callbacks. - */ - private boolean fileWritable = true; - - private static final HashChecker hashChecker; - - static { - HashChecker hc; - try { - hc = new HashChecker("SHA-1"); - } catch (NoSuchAlgorithmException e) { - hc = null; - } - hashChecker = hc; - } - - public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, - long fileSize, List sha1Sums, byte[] machineDescription) throws FileNotFoundException { - super(uploadId); - this.tmpFileName = destinationFile; - this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw"); - this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums); - this.owner = owner; - this.image = image; - this.fileSize = fileSize; - this.machineDescription = machineDescription; - } - - /** - * Set meta data for this image version. - * - * @param user - * - * @param data - */ - public boolean setVersionData(UserInfo user, ImageVersionWrite data) { - synchronized (versionWrittenToDb) { - if (versionWrittenToDb.get()) { - return false; - } - if (!user.userId.equals(owner.userId)) { - return false; - } - versionSettings = data; - return true; - } - } - - /** - * Add another connection for this file transfer. Currently only one - * connection is allowed, but this might change in the future. - * - * @param connection - * @return true if the connection is accepted, false if it should be - * discarded - */ - public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) { - if (state == TransferState.FINISHED || state == TransferState.ERROR) - return false; - synchronized (downloads) { - if (downloads.size() >= MAX_CONNECTIONS) - return false; - downloads.add(connection); - } - try { - pool.execute(new Runnable() { - @Override - public void run() { - CbHandler cbh = new CbHandler(); - if (!connection.download(cbh, cbh)) { - if (cbh.currentChunk != null) { - // If the download failed and we have a current chunk, put it back into - // the queue, so it will be handled again later... - chunks.markFailed(cbh.currentChunk); - } - LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed"); - } - lastActivityTime.set(System.currentTimeMillis()); - synchronized (downloads) { - downloads.remove(connection); - } - if (chunks.isComplete()) { - finishUpload(); - } - } - }); - if (state == TransferState.IDLE) { - state = TransferState.WORKING; - } - } catch (Exception e) { - LOGGER.warn("threadpool rejected the incoming file transfer", e); - return false; - } - return true; - } - - /** - * Write some data to the local file. Thread safe so we can - * have multiple concurrent connections. - * - * @param fileOffset - * @param dataLength - * @param data - * @return - */ - private void writeFileData(long fileOffset, int dataLength, byte[] data) { - if (state != TransferState.WORKING) - throw new IllegalStateException("Cannot write to file if state != RUNNING"); - synchronized (tmpFileHandle) { - try { - tmpFileHandle.seek(fileOffset); - tmpFileHandle.write(data, 0, dataLength); - } catch (IOException e) { - LOGGER.error("Cannot write to '" + tmpFileName - + "'. Disk full, network storage error, bad permissions, ...?", e); - fileWritable = false; - state = TransferState.ERROR; - } - } - } - - /** - * Called when the upload finished. - */ - private synchronized void finishUpload() { - synchronized (tmpFileHandle) { - if (state != TransferState.WORKING) - return; - Util.safeClose(tmpFileHandle); - state = TransferState.FINISHED; - } - potentialFinishTime.set(System.currentTimeMillis()); - // If owner is not set, this was a repair-transfer, which downloads directly to the existing target file. - // Nothing more to do in that case. - if (isRepairUpload()) - return; - LOGGER.info("Finalizing uploaded image " + image.imageName); - // Ready to go. First step: Rename temp file to something usable - File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName)); - // Sanity check: destination should be a sub directory of the vmStorePath - String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath()); - if (relPath == null) { - LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " - + Configuration.getVmStoreBasePath().getAbsolutePath()); - cancel(); - return; - } - if (relPath.length() > 200) { - LOGGER.error("Generated file name is >200 chars. DB will not like it"); - } - - // Execute rename - boolean ret = false; - Exception renameException = null; - try { - ret = tmpFileName.renameTo(destination); - } catch (Exception e) { - ret = false; - renameException = e; - } - if (!ret) { - // Rename failed :-( - LOGGER.warn( - "Could not rename '" + tmpFileName.getAbsolutePath() + "' to '" - + destination.getAbsolutePath() + "'", renameException); - cancel(); - return; - } - - // Now insert meta data into DB - try { - synchronized (versionWrittenToDb) { - DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath, - versionSettings, chunks, machineDescription); - versionWrittenToDb.set(true); - } - } catch (SQLException e) { - LOGGER.error("Error finishing upload: Inserting version to DB failed", e); - state = TransferState.ERROR; - // Also delete uploaded file, as there is no refence to it - FileSystem.deleteAsync(destination); - cancel(); - return; - } - } - - @Override - public synchronized void cancel() { - if (state != TransferState.FINISHED) { - state = TransferState.ERROR; - if (!isRepairUpload() && tmpFileName.exists()) { - FileSystem.deleteAsync(tmpFileName); - } - } - synchronized (downloads) { - for (Downloader download : downloads) { - download.cancel(); - } - } - } - - public boolean isRepairUpload() { - return owner == null; - } - - /** - * Get user owning this upload. Can be null in special cases. - * - * @return instance of UserInfo for the according user. - */ - public UserInfo getOwner() { - return this.owner; - } - - public File getDestinationFile() { - return this.tmpFileName; - } - - public long getSize() { - return this.fileSize; - } - - /** - * Callback class for an instance of the Downloader, which supplies - * the Downloader with wanted file ranges, and handles incoming data. - */ - private class CbHandler implements WantRangeCallback, DataReceivedCallback { - /** - * The current chunk being transfered. - */ - private FileChunk currentChunk = null; - private byte[] buffer = new byte[FileChunk.CHUNK_SIZE]; - - @Override - public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { - if (currentChunk == null) - throw new IllegalStateException("dataReceived without current chunk"); - if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength)) - throw new IllegalStateException("dataReceived with file data out of range"); - System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength); - return fileWritable; - } - - @Override - public FileRange get() { - if (currentChunk != null) { - if (hashChecker != null && currentChunk.getSha1Sum() != null) { - try { - hashChecker.queue(currentChunk, buffer, activeUpload); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - buffer = new byte[buffer.length]; - } else { - writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer); - chunks.markSuccessful(currentChunk); - } - } - // Get next missing chunk - try { - currentChunk = chunks.getMissing(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - if (currentChunk == null) { - return null; // No more chunks, returning null tells the Downloader we're done. - } - return currentChunk.range; - } - } - - public synchronized TransferStatus getStatus() { - return new TransferStatus(chunks.getStatusArray(), state); - } - - @Override - public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { - switch (result) { - case FAILURE: - LOGGER.warn("Hash check of chunk " + chunk.toString() - + " could not be executed. Assuming valid :-("); - // Fall through - case VALID: - writeFileData(chunk.range.startOffset, chunk.range.getLength(), data); - chunks.markSuccessful(chunk); - if (chunks.isComplete()) { - finishUpload(); - } - break; - case INVALID: - LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch " - + chunk.getFailCount() + "x :-("); - chunks.markFailed(chunk); - break; - } - } - - private byte[] loadChunkFromFile(FileChunk chunk) { - synchronized (tmpFileHandle) { - if (state != TransferState.IDLE && state != TransferState.WORKING) - return null; - try { - tmpFileHandle.seek(chunk.range.startOffset); - byte[] buffer = new byte[chunk.range.getLength()]; - tmpFileHandle.readFully(buffer); - return buffer; - } catch (IOException e) { - LOGGER.error( - "Could not read chunk " + chunk.getChunkIndex() + " of File " - + tmpFileName.toString(), e); - return null; - } - } - } - - public void updateBlockHashList(List hashList) { - if (state != TransferState.IDLE && state != TransferState.WORKING) - return; - if (hashChecker == null) - return; - chunks.updateSha1Sums(hashList); - FileChunk chunk; - while (null != (chunk = chunks.getUnhashedComplete())) { - byte[] data = loadChunkFromFile(chunk); - if (data == null) { - LOGGER.warn("Will mark unloadable chunk as valid :-("); - chunks.markSuccessful(chunk); - continue; - } - try { - hashChecker.queue(chunk, data, this); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } - } - - @Override - public boolean isActive() { - return state == TransferState.IDLE || state == TransferState.WORKING; - } - - @Override - public int getActiveConnectionCount() { - return downloads.size(); - } - -} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index 6e8bcf1c..f821813f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -41,24 +41,26 @@ public class FileServer implements IncomingEvent { private final Listener sslListener; - private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(2, Constants.MAX_UPLOADS + private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, Constants.MAX_UPLOADS + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(1)); /** * All currently running uploads, indexed by token */ - private final Map uploads = new ConcurrentHashMap<>(); + private final Map uploads = new ConcurrentHashMap<>(); /** * All currently running downloads, indexed by token */ - private final Map downloads = new ConcurrentHashMap<>(); + private final Map downloads = new ConcurrentHashMap<>(); private static final FileServer globalInstance = new FileServer(); private FileServer() { SSLContext ctx = Identity.getSSLContext(); sslListener = ctx == null ? null : new Listener(this, ctx, 9093, Constants.TRANSFER_TIMEOUT); + LOGGER.info("Max allowed concurrent uploads from clients: " + Constants.MAX_UPLOADS); + LOGGER.info("Max allowed concurrent downloads from clients: " + Constants.MAX_DOWNLOADS); } public static FileServer instance() { @@ -77,7 +79,7 @@ public class FileServer implements IncomingEvent { public void incomingDownloadRequest(Uploader uploader) throws IOException { String token = uploader.getToken(); LOGGER.info("Incoming filetransfer with token " + token); - ActiveDownload download = downloads.get(token); + OutgoingDataTransfer download = downloads.get(token); if (download == null) { LOGGER.warn("Unknown token " + token); uploader.cancel(); @@ -92,7 +94,7 @@ public class FileServer implements IncomingEvent { public void incomingUploadRequest(Downloader downloader) throws IOException { String token = downloader.getToken(); LOGGER.info("Incoming filetransfer with token " + token); - ActiveUpload upload = uploads.get(token); + IncomingDataTransfer upload = uploads.get(token); if (upload == null) { LOGGER.warn("Unknown token " + token); downloader.cancel(); @@ -109,25 +111,25 @@ public class FileServer implements IncomingEvent { * @param uploadToken * @return */ - public ActiveUpload getUploadByToken(String uploadToken) { + public IncomingDataTransfer getUploadByToken(String uploadToken) { if (uploadToken == null) return null; return uploads.get(uploadToken); } - public ActiveDownload getDownloadByToken(String downloadToken) { + public OutgoingDataTransfer getDownloadByToken(String downloadToken) { if (downloadToken == null) return null; return downloads.get(downloadToken); } - public ActiveUpload createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, + public IncomingDataTransfer createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, List sha1Sums, byte[] machineDescription) throws TTransferRejectedException { - Iterator it = uploads.values().iterator(); + Iterator it = uploads.values().iterator(); final long now = System.currentTimeMillis(); int activeUploads = 0; while (it.hasNext()) { - ActiveUpload upload = it.next(); + IncomingDataTransfer upload = it.next(); if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) { upload.cancel(); it.remove(); @@ -146,9 +148,9 @@ public class FileServer implements IncomingEvent { destinationFile.getParentFile().mkdirs(); String key = UUID.randomUUID().toString(); - ActiveUpload upload; + IncomingDataTransfer upload; try { - upload = new ActiveUpload(key, owner, image, destinationFile, fileSize, sha1Sums, + upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums, machineDescription); } catch (FileNotFoundException e) { LOGGER.error("Could not open destination file for writing", e); @@ -171,13 +173,13 @@ public class FileServer implements IncomingEvent { return sslListener.getPort(); } - public ActiveDownload createNewUserDownload(LocalImageVersion localImageData) + public OutgoingDataTransfer createNewUserDownload(LocalImageVersion localImageData) throws TTransferRejectedException { - Iterator it = downloads.values().iterator(); + Iterator it = downloads.values().iterator(); final long now = System.currentTimeMillis(); int activeDownloads = 0; while (it.hasNext()) { - ActiveDownload download = it.next(); + OutgoingDataTransfer download = it.next(); if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) { download.cancel(); it.remove(); @@ -215,7 +217,7 @@ public class FileServer implements IncomingEvent { throw new TTransferRejectedException(errorMessage); } String key = UUID.randomUUID().toString(); - ActiveDownload transfer = new ActiveDownload(key, srcFile); + OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile); downloads.put(key, transfer); return transfer; } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java new file mode 100644 index 00000000..0ca5d9b6 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -0,0 +1,537 @@ +package org.openslx.bwlp.sat.fileserv; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.security.NoSuchAlgorithmException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLContext; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.thrift.ThriftUtil; +import org.openslx.bwlp.sat.util.Configuration; +import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.sat.util.FileSystem; +import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.sat.util.Util; +import org.openslx.bwlp.thrift.iface.ImageDetailsRead; +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.bwlp.thrift.iface.TransferState; +import org.openslx.bwlp.thrift.iface.TransferStatus; +import org.openslx.bwlp.thrift.iface.UserInfo; +import org.openslx.filetransfer.DataReceivedCallback; +import org.openslx.filetransfer.Downloader; +import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.WantRangeCallback; +import org.openslx.filetransfer.util.ChunkList; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.filetransfer.util.HashChecker; +import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; +import org.openslx.filetransfer.util.HashChecker.HashResult; + +public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback { + + private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class); + + /** + * How many concurrent connections per upload + */ + private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_UPLOADS / 4, 1); + + /** + * Self reference for inner classes. + */ + private final IncomingDataTransfer activeUpload = this; + + /** + * Remote peer is uploading, so on our end, we have Downloaders + */ + private List downloads = new ArrayList<>(); + + private final File tmpFileName; + + private final RandomAccessFile tmpFileHandle; + + private final ChunkList chunks; + + private final long fileSize; + + /** + * User owning this uploaded file. + */ + private final UserInfo owner; + + /** + * Base image this upload is a new version for. + */ + private final ImageDetailsRead image; + + /** + * Flags to set for this new image version. Optional field. + */ + private ImageVersionWrite versionSettings = null; + + /** + * TransferState of this upload + */ + private TransferState state = TransferState.IDLE; + + /** + * Description of this VM - binary dump of e.g. the *.vmx file (VMware) + */ + private final byte[] machineDescription; + + /** + * Indicated whether the version information was written to db already. + * Disallow setVersionData in that case. + */ + private final AtomicBoolean versionWrittenToDb = new AtomicBoolean(); + + /** + * Whether file is (still) writable. Used for the file transfer callbacks. + */ + private boolean fileWritable = true; + + /** + * Set if this is a download from the master server + */ + private final TransferInformation masterTransferInfo; + + private static final HashChecker hashChecker; + + static { + HashChecker hc; + try { + hc = new HashChecker("SHA-1"); + } catch (NoSuchAlgorithmException e) { + hc = null; + } + hashChecker = hc; + } + + public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image, + File destinationFile, long fileSize, List sha1Sums, byte[] machineDescription) + throws FileNotFoundException { + super(uploadId); + this.tmpFileName = destinationFile; + this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw"); + this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums); + this.owner = owner; + this.image = image; + this.fileSize = fileSize; + this.machineDescription = machineDescription; + this.masterTransferInfo = null; + } + + public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo) + throws FileNotFoundException { + super(publishData.imageVersionId); + ImageDetailsRead idr = new ImageDetailsRead(); + idr.setCreateTime(publishData.createTime); + idr.setDescription(publishData.description); + idr.setImageBaseId(publishData.imageBaseId); + idr.setImageName(publishData.imageName); + idr.setIsTemplate(publishData.isTemplate); + idr.setLatestVersionId(publishData.imageVersionId); + idr.setOsId(publishData.osId); + idr.setOwnerId(publishData.user.userId); + idr.setTags(publishData.tags); + idr.setUpdaterId(publishData.user.userId); + idr.setUpdateTime(publishData.createTime); + idr.setVirtId(publishData.virtId); + this.tmpFileName = tmpFile; + this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw"); + this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null + : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); + this.owner = publishData.user; + this.image = idr; + this.fileSize = publishData.fileSize; + this.machineDescription = transferInfo.machineDescription.getBytes(StandardCharsets.UTF_8); + this.masterTransferInfo = transferInfo; + this.versionSettings = new ImageVersionWrite(false); + } + + /** + * Called periodically if this is a transfer from the master server, so we + * can make sure the transfer is running. + */ + public void heartBeat(ThreadPoolExecutor pool) { + if (masterTransferInfo == null) + return; + synchronized (this) { + synchronized (downloads) { + if (downloads.size() >= 1) // TODO What to pick here? + return; + } + Downloader downloader = null; + if (masterTransferInfo.plainPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null, + masterTransferInfo.token); + } catch (Exception e1) { + LOGGER.debug("Plain connect failed", e1); + downloader = null; + } + } + if (downloader == null && masterTransferInfo.sslPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready + masterTransferInfo.token); + } catch (Exception e2) { + LOGGER.debug("SSL connect failed", e2); + downloader = null; + } + } + if (downloader == null) { + LOGGER.warn("Could not connect to master server for downloading " + image.imageName); + return; + } + addConnection(downloader, pool); + } + } + + /** + * Set meta data for this image version. + * + * @param user + * + * @param data + */ + public boolean setVersionData(UserInfo user, ImageVersionWrite data) { + synchronized (versionWrittenToDb) { + if (versionWrittenToDb.get()) { + return false; + } + if (!user.userId.equals(owner.userId)) { + return false; + } + versionSettings = data; + return true; + } + } + + /** + * Add another connection for this file transfer. Currently only one + * connection is allowed, but this might change in the future. + * + * @param connection + * @return true if the connection is accepted, false if it should be + * discarded + */ + public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) { + if (state == TransferState.FINISHED || state == TransferState.ERROR) + return false; + synchronized (downloads) { + if (downloads.size() >= MAX_CONNECTIONS) + return false; + downloads.add(connection); + } + try { + pool.execute(new Runnable() { + @Override + public void run() { + CbHandler cbh = new CbHandler(); + if (!connection.download(cbh, cbh)) { + if (cbh.currentChunk != null) { + // If the download failed and we have a current chunk, put it back into + // the queue, so it will be handled again later... + chunks.markFailed(cbh.currentChunk); + } + LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed"); + } + lastActivityTime.set(System.currentTimeMillis()); + synchronized (downloads) { + downloads.remove(connection); + } + if (chunks.isComplete()) { + finishUpload(); + } + } + }); + if (state == TransferState.IDLE) { + state = TransferState.WORKING; + } + } catch (Exception e) { + LOGGER.warn("threadpool rejected the incoming file transfer", e); + return false; + } + return true; + } + + /** + * Write some data to the local file. Thread safe so we can + * have multiple concurrent connections. + * + * @param fileOffset + * @param dataLength + * @param data + * @return + */ + private void writeFileData(long fileOffset, int dataLength, byte[] data) { + if (state != TransferState.WORKING) + throw new IllegalStateException("Cannot write to file if state != RUNNING"); + synchronized (tmpFileHandle) { + try { + tmpFileHandle.seek(fileOffset); + tmpFileHandle.write(data, 0, dataLength); + } catch (IOException e) { + LOGGER.error("Cannot write to '" + tmpFileName + + "'. Disk full, network storage error, bad permissions, ...?", e); + fileWritable = false; + state = TransferState.ERROR; + } + } + } + + /** + * Called when the upload finished. + */ + private synchronized void finishUpload() { + synchronized (tmpFileHandle) { + if (state != TransferState.WORKING) + return; + Util.safeClose(tmpFileHandle); + state = TransferState.FINISHED; + } + potentialFinishTime.set(System.currentTimeMillis()); + // If owner is not set, this was a repair-transfer, which downloads directly to the existing target file. + // Nothing more to do in that case. + if (isRepairUpload()) + return; + LOGGER.info("Finalizing uploaded image " + image.imageName); + // Ready to go. First step: Rename temp file to something usable + File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName)); + // Sanity check: destination should be a sub directory of the vmStorePath + String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath()); + if (relPath == null) { + LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " + + Configuration.getVmStoreBasePath().getAbsolutePath()); + cancel(); + return; + } + if (relPath.length() > 200) { + LOGGER.error("Generated file name is >200 chars. DB will not like it"); + } + + // Execute rename + boolean ret = false; + Exception renameException = null; + try { + ret = tmpFileName.renameTo(destination); + } catch (Exception e) { + ret = false; + renameException = e; + } + if (!ret) { + // Rename failed :-( + LOGGER.warn( + "Could not rename '" + tmpFileName.getAbsolutePath() + "' to '" + + destination.getAbsolutePath() + "'", renameException); + cancel(); + return; + } + + // Now insert meta data into DB + try { + synchronized (versionWrittenToDb) { + DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath, + versionSettings, chunks, machineDescription); + versionWrittenToDb.set(true); + } + } catch (SQLException e) { + LOGGER.error("Error finishing upload: Inserting version to DB failed", e); + state = TransferState.ERROR; + // Also delete uploaded file, as there is no refence to it + FileSystem.deleteAsync(destination); + cancel(); + return; + } + } + + @Override + public synchronized void cancel() { + if (state != TransferState.FINISHED) { + state = TransferState.ERROR; + if (!isRepairUpload() && tmpFileName.exists()) { + FileSystem.deleteAsync(tmpFileName); + } + } + synchronized (downloads) { + for (Downloader download : downloads) { + download.cancel(); + } + } + } + + public boolean isRepairUpload() { + return owner == null; + } + + /** + * Get user owning this upload. Can be null in special cases. + * + * @return instance of UserInfo for the according user. + */ + public UserInfo getOwner() { + return this.owner; + } + + public File getDestinationFile() { + return this.tmpFileName; + } + + public long getSize() { + return this.fileSize; + } + + /** + * Callback class for an instance of the Downloader, which supplies + * the Downloader with wanted file ranges, and handles incoming data. + */ + private class CbHandler implements WantRangeCallback, DataReceivedCallback { + /** + * The current chunk being transfered. + */ + private FileChunk currentChunk = null; + private byte[] buffer = new byte[FileChunk.CHUNK_SIZE]; + + @Override + public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { + if (currentChunk == null) + throw new IllegalStateException("dataReceived without current chunk"); + if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength)) + throw new IllegalStateException("dataReceived with file data out of range"); + System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength); + return fileWritable; + } + + @Override + public FileRange get() { + if (currentChunk != null) { + if (hashChecker != null && currentChunk.getSha1Sum() != null) { + try { + hashChecker.queue(currentChunk, buffer, activeUpload); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + buffer = new byte[buffer.length]; + } else { + writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer); + chunks.markSuccessful(currentChunk); + } + } + // Get next missing chunk + try { + currentChunk = chunks.getMissing(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + if (currentChunk == null) { + return null; // No more chunks, returning null tells the Downloader we're done. + } + return currentChunk.range; + } + } + + public synchronized TransferStatus getStatus() { + return new TransferStatus(chunks.getStatusArray(), state); + } + + @Override + public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { + switch (result) { + case FAILURE: + LOGGER.warn("Hash check of chunk " + chunk.toString() + + " could not be executed. Assuming valid :-("); + // Fall through + case VALID: + writeFileData(chunk.range.startOffset, chunk.range.getLength(), data); + chunks.markSuccessful(chunk); + if (chunks.isComplete()) { + finishUpload(); + } + break; + case INVALID: + LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch " + + chunk.getFailCount() + "x :-("); + chunks.markFailed(chunk); + break; + } + } + + private byte[] loadChunkFromFile(FileChunk chunk) { + synchronized (tmpFileHandle) { + if (state != TransferState.IDLE && state != TransferState.WORKING) + return null; + try { + tmpFileHandle.seek(chunk.range.startOffset); + byte[] buffer = new byte[chunk.range.getLength()]; + tmpFileHandle.readFully(buffer); + return buffer; + } catch (IOException e) { + LOGGER.error( + "Could not read chunk " + chunk.getChunkIndex() + " of File " + + tmpFileName.toString(), e); + return null; + } + } + } + + public void updateBlockHashList(List hashList) { + if (state != TransferState.IDLE && state != TransferState.WORKING) + return; + if (hashChecker == null) + return; + chunks.updateSha1Sums(hashList); + FileChunk chunk; + while (null != (chunk = chunks.getUnhashedComplete())) { + byte[] data = loadChunkFromFile(chunk); + if (data == null) { + LOGGER.warn("Will mark unloadable chunk as valid :-("); + chunks.markSuccessful(chunk); + continue; + } + try { + hashChecker.queue(chunk, data, this); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + @Override + public boolean isActive() { + return state == TransferState.IDLE || state == TransferState.WORKING; + } + + @Override + public int getActiveConnectionCount() { + return downloads.size(); + } + + @Override + protected void finalize() { + try { + Util.safeClose(tmpFileHandle); + if (tmpFileName.exists()) + tmpFileName.delete(); + } catch (Throwable t) { + } + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java new file mode 100644 index 00000000..e7c6715f --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java @@ -0,0 +1,102 @@ +package org.openslx.bwlp.sat.fileserv; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.sat.util.Constants; +import org.openslx.filetransfer.Uploader; + +public class OutgoingDataTransfer extends AbstractTransfer { + + private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class); + + /** + * How many concurrent connections per download + */ + private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_DOWNLOADS / 4, 1); + + /** + * Remote peer is downloading, so we have Uploaders + */ + private List uploads = new ArrayList<>(); + + private final File sourceFile; + + private boolean isCanceled = false; + + public OutgoingDataTransfer(String uuid, File file) { + super(uuid); + this.sourceFile = file; + } + + /** + * Called periodically if this is a transfer from the master server, so we + * can make sure the transfer is running. + */ + public void heartBeat(ThreadPoolExecutor pool) { + // TODO + } + + /** + * Add another connection for this file transfer. Currently only one + * connection is allowed, but this might change in the future. + * + * @param connection + * @return true if the connection is accepted, false if it should be + * discarded + */ + public synchronized boolean addConnection(final Uploader connection, ThreadPoolExecutor pool) { + if (isCanceled) + return false; + potentialFinishTime.set(0); + synchronized (uploads) { + if (uploads.size() > MAX_CONNECTIONS) + return false; + uploads.add(connection); + } + try { + pool.execute(new Runnable() { + @Override + public void run() { + potentialFinishTime.set(0); + boolean ret = connection.upload(sourceFile.getAbsolutePath()); + synchronized (uploads) { + uploads.remove(connection); + } + if (ret && uploads.isEmpty()) { + potentialFinishTime.set(System.currentTimeMillis()); + } + lastActivityTime.set(System.currentTimeMillis()); + } + }); + } catch (Exception e) { + LOGGER.warn("threadpool rejected the incoming file transfer", e); + return false; + } + return true; + } + + @Override + public synchronized void cancel() { + isCanceled = true; + synchronized (uploads) { + for (Uploader u : uploads) { + u.cancel(); + } + } + } + + @Override + public boolean isActive() { + return !isCanceled; + } + + @Override + public int getActiveConnectionCount() { + return uploads.size(); + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java new file mode 100644 index 00000000..82c4ab9e --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -0,0 +1,132 @@ +package org.openslx.bwlp.sat.fileserv; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.InvocationError; +import org.openslx.bwlp.thrift.iface.TAuthorizationException; +import org.openslx.bwlp.thrift.iface.TInvocationException; +import org.openslx.bwlp.thrift.iface.TNotFoundException; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; + +/** + * Manages file transfers between this satellite and the master server. + */ +public class SyncTransferHandler { + + private static final Logger LOGGER = Logger.getLogger(SyncTransferHandler.class); + + private static final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(1, + Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue(1)); + + /** + * All currently running uploads, indexed by token + */ + private static final Map downloads = new ConcurrentHashMap<>(); + + /** + * All currently running downloads, indexed by token + */ + private static final Map uploads = new ConcurrentHashMap<>(); + + private static Task heartBeatTask = new Task() { + private final Runnable worker = new Runnable() { + @Override + public void run() { + for (IncomingDataTransfer download : downloads.values()) { + if (download.isActive()) + download.heartBeat(transferPool); + } + for (OutgoingDataTransfer upload : uploads.values()) { + if (upload.isActive()) + upload.heartBeat(transferPool); + } + } + }; + + @Override + public void fire() { + if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1) + return; + transferPool.execute(worker); + } + }; + + // + + static { + QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56)); + } + + public synchronized static String requestImageDownload(ImagePublishData image) + throws TInvocationException, TAuthorizationException, TNotFoundException { + TransferInformation transferInfo; + // Already replicating this one? + if (downloads.containsKey(image.imageVersionId)) + return image.imageVersionId; + checkDownloadCount(); + try { + transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId); + } catch (TAuthorizationException e) { + LOGGER.warn("Master server rejected our session on downloadImage", e); + throw e; + } catch (TNotFoundException e) { + LOGGER.warn("Master server couldn't find image on downloadImage", e); + throw e; + } catch (TException e) { + LOGGER.warn("Master server made a boo-boo on downloadImage", e); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Communication with master server failed"); + } + File tmpFile = null; + do { + tmpFile = Formatter.getTempImageName(); + } while (tmpFile.exists()); + tmpFile.getParentFile().mkdirs(); + try { + IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo); + downloads.put(transfer.getId(), transfer); + return transfer.getId(); + } catch (FileNotFoundException e) { + LOGGER.warn("Could not open " + tmpFile.getAbsolutePath()); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Could not access local file for writing"); + } + } + + private static void checkDownloadCount() throws TInvocationException { + Iterator it = downloads.values().iterator(); + final long now = System.currentTimeMillis(); + int activeDownloads = 0; + while (it.hasNext()) { + IncomingDataTransfer upload = it.next(); + if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) { + upload.cancel(); + it.remove(); + continue; + } + activeDownloads++; + } + if (activeDownloads > Constants.MAX_MASTER_DOWNLOADS) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Server busy. Too many running downloads (" + activeDownloads + "/" + + Constants.MAX_MASTER_DOWNLOADS + ")."); + } + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java index 29fa5613..0fc8a365 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/permissions/User.java @@ -331,6 +331,27 @@ public class User { "Only the super user can change the expire date of images"); } + public static void canTriggerReplicationOrFail(UserInfo user, String imageVersionId) + throws TAuthorizationException, TInvocationException { + if (isTutor(user)) { + ImageSummaryRead image; + try { + image = getImageFromVersionId(user, imageVersionId); + } catch (TNotFoundException e) { + // If the image is not known locally, allow replication + return; + } + // If it's a remote image, or if the user has edit permissions, allow + if (image.shareMode == ShareMode.DOWNLOAD || image.shareMode == ShareMode.FROZEN + || image.userPermissions.edit) + return; + throw new TAuthorizationException(AuthorizationError.NO_PERMISSION, + "You cannot trigger downloading an image to the satellite server that is not in replication mode"); + } + throw new TAuthorizationException(AuthorizationError.NO_PERMISSION, + "Only tutors can trigger image replication"); + } + public static void setCombinedUserPermissions(ImageSummaryRead image, UserInfo user) { if (hasAllImagePermissions(user, image.ownerId)) { image.userPermissions = imageSu; @@ -435,6 +456,15 @@ public class User { } } + private static ImageSummaryRead getImageFromVersionId(UserInfo user, String imageVersionId) + throws TNotFoundException, TInvocationException { + try { + return DbImage.getImageSummary(user, DbImage.getBaseIdForVersionId(imageVersionId)); + } catch (SQLException e) { + throw new TInvocationException(); + } + } + private static LectureSummary getLectureFromId(UserInfo user, String lectureId) throws TNotFoundException, TInvocationException { try { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java index 533c8acf..3b76efcd 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java @@ -2,7 +2,6 @@ package org.openslx.bwlp.sat.thrift; import java.nio.ByteBuffer; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -15,9 +14,10 @@ import org.openslx.bwlp.sat.database.mappers.DbLecture; import org.openslx.bwlp.sat.database.mappers.DbLecturePermissions; import org.openslx.bwlp.sat.database.mappers.DbUser; import org.openslx.bwlp.sat.database.models.LocalUser; -import org.openslx.bwlp.sat.fileserv.ActiveDownload; -import org.openslx.bwlp.sat.fileserv.ActiveUpload; import org.openslx.bwlp.sat.fileserv.FileServer; +import org.openslx.bwlp.sat.fileserv.IncomingDataTransfer; +import org.openslx.bwlp.sat.fileserv.OutgoingDataTransfer; +import org.openslx.bwlp.sat.fileserv.SyncTransferHandler; import org.openslx.bwlp.sat.permissions.User; import org.openslx.bwlp.sat.thrift.cache.OperatingSystemList; import org.openslx.bwlp.sat.thrift.cache.OrganizationList; @@ -29,6 +29,7 @@ import org.openslx.bwlp.thrift.iface.AuthorizationError; import org.openslx.bwlp.thrift.iface.ImageBaseWrite; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePermissions; +import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageSummaryRead; import org.openslx.bwlp.thrift.iface.ImageVersionDetails; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; @@ -56,6 +57,7 @@ import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.bwlp.thrift.iface.Virtualizer; import org.openslx.bwlp.thrift.iface.WhoamiInfo; import org.openslx.sat.thrift.version.Version; +import org.openslx.thrifthelper.ThriftManager; public class ServerHandler implements SatelliteServer.Iface { @@ -77,18 +79,6 @@ public class ServerHandler implements SatelliteServer.Iface { * File Transfer */ - private List unwrapHashes(List blockHashes) { - if (blockHashes == null || blockHashes.isEmpty()) - return null; - List hashList = new ArrayList<>(blockHashes.size()); - for (ByteBuffer hash : blockHashes) { - byte[] buffer = new byte[hash.remaining()]; - hash.get(buffer); - hashList.add(buffer); - } - return hashList; - } - @Override public TransferInformation requestImageVersionUpload(String userToken, String imageBaseId, long fileSize, List blockHashes, ByteBuffer machineDescription) throws TTransferRejectedException, @@ -101,31 +91,30 @@ public class ServerHandler implements SatelliteServer.Iface { } catch (SQLException e) { throw new TInvocationException(); } + if (image.shareMode != ShareMode.LOCAL && image.shareMode != ShareMode.PUBLISH) + throw new TAuthorizationException(AuthorizationError.NO_PERMISSION, + "Cannot upload new versions of a replicated image"); // Unwrap machine description - byte[] mDesc = null; - if (machineDescription != null) { - mDesc = new byte[machineDescription.remaining()]; - machineDescription.get(mDesc); - } + byte[] mDesc = ThriftUtil.unwrapByteBuffer(machineDescription); // Unwrap sha1sum list - List hashList = unwrapHashes(blockHashes); - ActiveUpload transfer = fileServer.createNewUserUpload(user, image, fileSize, hashList, mDesc); + List hashList = ThriftUtil.unwrapByteBufferList(blockHashes); + IncomingDataTransfer transfer = fileServer.createNewUserUpload(user, image, fileSize, hashList, mDesc); return new TransferInformation(transfer.getId(), fileServer.getPlainPort(), fileServer.getSslPort()); } @Override public void updateBlockHashes(String uploadToken, List blockHashes) throws TInvalidTokenException { - ActiveUpload upload = fileServer.getUploadByToken(uploadToken); + IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken); if (upload == null) throw new TInvalidTokenException(); - List hashList = unwrapHashes(blockHashes); + List hashList = ThriftUtil.unwrapByteBufferList(blockHashes); upload.updateBlockHashList(hashList); } @Override public void cancelUpload(String uploadToken) { - ActiveUpload upload = fileServer.getUploadByToken(uploadToken); + IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken); if (upload != null) upload.cancel(); @@ -133,7 +122,7 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public TransferStatus queryUploadStatus(String uploadToken) throws TInvalidTokenException { - ActiveUpload upload = fileServer.getUploadByToken(uploadToken); + IncomingDataTransfer upload = fileServer.getUploadByToken(uploadToken); if (upload == null) throw new TInvalidTokenException(); return upload.getStatus(); @@ -145,7 +134,7 @@ public class ServerHandler implements SatelliteServer.Iface { TTransferRejectedException { UserInfo user = SessionManager.getOrFail(userToken); User.canDownloadImageVersionOrFail(user, imageVersionId); - ActiveDownload transfer; + OutgoingDataTransfer transfer; try { transfer = fileServer.createNewUserDownload(DbImage.getLocalImageData(imageVersionId)); } catch (SQLException e) { @@ -156,7 +145,7 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public void cancelDownload(String downloadToken) { - ActiveDownload download = fileServer.getDownloadByToken(downloadToken); + OutgoingDataTransfer download = fileServer.getDownloadByToken(downloadToken); if (download != null) download.cancel(); } @@ -312,7 +301,7 @@ public class ServerHandler implements SatelliteServer.Iface { throws TAuthorizationException, TInvocationException, TNotFoundException { UserInfo user = SessionManager.getOrFail(userToken); // Special case: Version is still being uploaded, so there's no entry yet - remember for later - ActiveUpload upload = fileServer.getUploadByToken(imageVersionId); + IncomingDataTransfer upload = fileServer.getUploadByToken(imageVersionId); if (upload != null && upload.setVersionData(user, image)) { return; } @@ -421,8 +410,27 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public String requestImageReplication(String userToken, String imageVersionId) throws TAuthorizationException, TNotFoundException, TInvocationException { - // TODO Auto-generated method stub - return null; + UserInfo user = SessionManager.getOrFail(userToken); + User.canTriggerReplicationOrFail(user, imageVersionId); + // Query master server + ImagePublishData imagePublishData; + try { + imagePublishData = ThriftManager.getMasterClient().getImageData(userToken, imageVersionId); + } catch (TException e) { + LOGGER.error( + "Could not query image data from master server for an image that a client wants to replicate", + e); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Cannot query master server for image information"); + } + // Known by master server; now update/write to DB + try { + DbImage.writeBaseImage(user, imagePublishData); + } catch (SQLException e) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Could not write to local DB"); + } + return SyncTransferHandler.requestImageDownload(imagePublishData); } @Override diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java new file mode 100644 index 00000000..ee02f440 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java @@ -0,0 +1,30 @@ +package org.openslx.bwlp.sat.thrift; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ThriftUtil { + + public static List unwrapByteBufferList(List blockHashes) { + if (blockHashes == null || blockHashes.isEmpty()) + return null; + List hashList = new ArrayList<>(blockHashes.size()); + for (ByteBuffer hash : blockHashes) { + byte[] buffer = new byte[hash.remaining()]; + hash.get(buffer); + hashList.add(buffer); + } + return hashList; + } + + public static byte[] unwrapByteBuffer(ByteBuffer buffer) { + byte[] byteArray = null; + if (buffer != null) { + byteArray = new byte[buffer.remaining()]; + buffer.get(byteArray); + } + return byteArray; + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java index b02d73d0..8afc6a8c 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java @@ -20,6 +20,7 @@ public class Configuration { private static String dbUri; private static String dbUsername; private static String dbPassword; + private static String masterAddress; public static boolean load() throws IOException { // Load configuration from java properties file @@ -36,6 +37,7 @@ public class Configuration { dbUri = prop.getProperty("db.uri"); dbUsername = prop.getProperty("db.username"); dbPassword = prop.getProperty("db.password"); + masterAddress = prop.getProperty("master.address"); // Currently all fields are mandatory but there might be optional settings in the future return vmStoreBasePath != null && dbUri != null && dbUsername != null && dbPassword != null; @@ -69,6 +71,10 @@ public class Configuration { return vmStoreProdPath; } + public static String getMasterServerAddress() { + return masterAddress; + } + // Dynamically Computed fields public static File getCurrentVmStorePath() { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java index 774d1c88..0af135ac 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java @@ -1,22 +1,50 @@ package org.openslx.bwlp.sat.util; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; + +import org.apache.log4j.Logger; import org.openslx.filetransfer.util.FileChunk; public class Constants { + + private static final Logger LOGGER = Logger.getLogger(Constants.class); + public static final String INCOMPLETE_UPLOAD_SUFFIX = ".part"; public static final int MAX_UPLOADS; public static final int MAX_DOWNLOADS; + public static final int MAX_MASTER_UPLOADS = 2; + public static final int MAX_MASTER_DOWNLOADS = 3; public static final int TRANSFER_TIMEOUT = 15 * 1000; // 15s static { long maxMem = Runtime.getRuntime().maxMemory(); if (maxMem == Long.MAX_VALUE) { // Apparently the JVM was started without a memory limit (no -Xmx cmdline), - // so we assume a default of 512MB - maxMem = 512l * 1024l * 1024l; + // so we try a dirty little trick by assuming this is linux and reading it + // from the /proc file system. If that fails too, assume a default of 512MB + try (BufferedReader br = new BufferedReader(new FileReader("/proc/meminfo"))) { + for (String line; (line = br.readLine()) != null;) { + if (line.startsWith("MemTotal:") && line.endsWith("kB")) { + String string = line.replaceAll("[^0-9]", ""); + try { + maxMem = (Long.parseLong(string) / 2l) * 1024l; + LOGGER.debug("Guessing usable JVM memory via /proc/meminfo"); + } catch (Exception e) { + } + break; + } + } + } catch (IOException e) { + } + if (maxMem == Long.MAX_VALUE) { + maxMem = 512l * 1024l * 1024l; + } } maxMem /= 1024l * 1024l; // Now maxMem is the amount of memory in MiB + LOGGER.debug("Maximum JVM memory: " + maxMem + "MiB"); MAX_UPLOADS = (int) Math.max(1, (maxMem - 64) / (FileChunk.CHUNK_SIZE_MIB + 1)); MAX_DOWNLOADS = MAX_UPLOADS * 2; -- cgit v1.2.3-55-g7522 From 3fb192b673699167ad4f1d044ebf1fdd3bce1483 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 28 Aug 2015 18:12:33 +0200 Subject: [server] Tweak calculation of transfer count limits --- .../src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java | 4 ++-- .../main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java | 2 +- .../main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java | 2 +- .../src/main/java/org/openslx/bwlp/sat/util/Constants.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) (limited to 'dozentenmodulserver/src/main/java') diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index f821813f..a08d4e15 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -135,7 +135,7 @@ public class FileServer implements IncomingEvent { it.remove(); continue; } - activeUploads++; + activeUploads += Math.max(1, upload.getActiveConnectionCount()); } if (activeUploads > Constants.MAX_UPLOADS) { throw new TTransferRejectedException("Server busy. Too many running uploads (" + activeUploads @@ -185,7 +185,7 @@ public class FileServer implements IncomingEvent { it.remove(); continue; } - activeDownloads++; + activeDownloads += Math.max(1, download.getActiveConnectionCount()); } if (activeDownloads > Constants.MAX_DOWNLOADS) { throw new TTransferRejectedException("Server busy. Too many running uploads (" + activeDownloads diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java index 0ca5d9b6..e4e3349e 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -46,7 +46,7 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC /** * How many concurrent connections per upload */ - private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_UPLOADS / 4, 1); + private static final int MAX_CONNECTIONS = Math.min(4, Math.max(Constants.MAX_UPLOADS / 4, 1)); /** * Self reference for inner classes. diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java index e7c6715f..b01a627f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java @@ -16,7 +16,7 @@ public class OutgoingDataTransfer extends AbstractTransfer { /** * How many concurrent connections per download */ - private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_DOWNLOADS / 4, 1); + private static final int MAX_CONNECTIONS = Math.min(4, Math.max(Constants.MAX_DOWNLOADS / 4, 1)); /** * Remote peer is downloading, so we have Uploaders diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java index 0af135ac..55a87b5f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java @@ -46,7 +46,7 @@ public class Constants { // Now maxMem is the amount of memory in MiB LOGGER.debug("Maximum JVM memory: " + maxMem + "MiB"); - MAX_UPLOADS = (int) Math.max(1, (maxMem - 64) / (FileChunk.CHUNK_SIZE_MIB + 1)); + MAX_UPLOADS = (int) Math.max(1, (maxMem - 64) / (FileChunk.CHUNK_SIZE_MIB * 2)); MAX_DOWNLOADS = MAX_UPLOADS * 2; } } -- cgit v1.2.3-55-g7522