diff options
author | Simon Rettberg | 2015-08-28 18:04:16 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-08-28 18:04:16 +0200 |
commit | 10f0687fe551bda88120c2dc2b003035dd9bbea8 (patch) | |
tree | a9a3103c5ca1981bad169a6a1527f0252c4bc76f /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java | |
parent | [client] save the selected download folder and not the generated folder (diff) | |
download | tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.gz tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.xz tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.zip |
[server] Working on image download from master server
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java')
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java | 537 |
1 files changed, 537 insertions, 0 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java new file mode 100644 index 00000000..0ca5d9b6 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -0,0 +1,537 @@ +package org.openslx.bwlp.sat.fileserv; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.security.NoSuchAlgorithmException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLContext; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.thrift.ThriftUtil; +import org.openslx.bwlp.sat.util.Configuration; +import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.sat.util.FileSystem; +import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.sat.util.Util; +import org.openslx.bwlp.thrift.iface.ImageDetailsRead; +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.bwlp.thrift.iface.TransferState; +import org.openslx.bwlp.thrift.iface.TransferStatus; +import org.openslx.bwlp.thrift.iface.UserInfo; +import org.openslx.filetransfer.DataReceivedCallback; +import org.openslx.filetransfer.Downloader; +import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.WantRangeCallback; +import org.openslx.filetransfer.util.ChunkList; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.filetransfer.util.HashChecker; +import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; +import org.openslx.filetransfer.util.HashChecker.HashResult; + +public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback { + + private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class); + + /** + * How many concurrent connections per upload + */ + private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_UPLOADS / 4, 1); + + /** + * Self reference for inner classes. + */ + private final IncomingDataTransfer activeUpload = this; + + /** + * Remote peer is uploading, so on our end, we have Downloaders + */ + private List<Downloader> downloads = new ArrayList<>(); + + private final File tmpFileName; + + private final RandomAccessFile tmpFileHandle; + + private final ChunkList chunks; + + private final long fileSize; + + /** + * User owning this uploaded file. + */ + private final UserInfo owner; + + /** + * Base image this upload is a new version for. + */ + private final ImageDetailsRead image; + + /** + * Flags to set for this new image version. Optional field. + */ + private ImageVersionWrite versionSettings = null; + + /** + * TransferState of this upload + */ + private TransferState state = TransferState.IDLE; + + /** + * Description of this VM - binary dump of e.g. the *.vmx file (VMware) + */ + private final byte[] machineDescription; + + /** + * Indicated whether the version information was written to db already. + * Disallow setVersionData in that case. + */ + private final AtomicBoolean versionWrittenToDb = new AtomicBoolean(); + + /** + * Whether file is (still) writable. Used for the file transfer callbacks. + */ + private boolean fileWritable = true; + + /** + * Set if this is a download from the master server + */ + private final TransferInformation masterTransferInfo; + + private static final HashChecker hashChecker; + + static { + HashChecker hc; + try { + hc = new HashChecker("SHA-1"); + } catch (NoSuchAlgorithmException e) { + hc = null; + } + hashChecker = hc; + } + + public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image, + File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) + throws FileNotFoundException { + super(uploadId); + this.tmpFileName = destinationFile; + this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw"); + this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums); + this.owner = owner; + this.image = image; + this.fileSize = fileSize; + this.machineDescription = machineDescription; + this.masterTransferInfo = null; + } + + public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo) + throws FileNotFoundException { + super(publishData.imageVersionId); + ImageDetailsRead idr = new ImageDetailsRead(); + idr.setCreateTime(publishData.createTime); + idr.setDescription(publishData.description); + idr.setImageBaseId(publishData.imageBaseId); + idr.setImageName(publishData.imageName); + idr.setIsTemplate(publishData.isTemplate); + idr.setLatestVersionId(publishData.imageVersionId); + idr.setOsId(publishData.osId); + idr.setOwnerId(publishData.user.userId); + idr.setTags(publishData.tags); + idr.setUpdaterId(publishData.user.userId); + idr.setUpdateTime(publishData.createTime); + idr.setVirtId(publishData.virtId); + this.tmpFileName = tmpFile; + this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw"); + this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null + : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); + this.owner = publishData.user; + this.image = idr; + this.fileSize = publishData.fileSize; + this.machineDescription = transferInfo.machineDescription.getBytes(StandardCharsets.UTF_8); + this.masterTransferInfo = transferInfo; + this.versionSettings = new ImageVersionWrite(false); + } + + /** + * Called periodically if this is a transfer from the master server, so we + * can make sure the transfer is running. + */ + public void heartBeat(ThreadPoolExecutor pool) { + if (masterTransferInfo == null) + return; + synchronized (this) { + synchronized (downloads) { + if (downloads.size() >= 1) // TODO What to pick here? + return; + } + Downloader downloader = null; + if (masterTransferInfo.plainPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null, + masterTransferInfo.token); + } catch (Exception e1) { + LOGGER.debug("Plain connect failed", e1); + downloader = null; + } + } + if (downloader == null && masterTransferInfo.sslPort != 0) { + try { + downloader = new Downloader(Configuration.getMasterServerAddress(), + masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready + masterTransferInfo.token); + } catch (Exception e2) { + LOGGER.debug("SSL connect failed", e2); + downloader = null; + } + } + if (downloader == null) { + LOGGER.warn("Could not connect to master server for downloading " + image.imageName); + return; + } + addConnection(downloader, pool); + } + } + + /** + * Set meta data for this image version. + * + * @param user + * + * @param data + */ + public boolean setVersionData(UserInfo user, ImageVersionWrite data) { + synchronized (versionWrittenToDb) { + if (versionWrittenToDb.get()) { + return false; + } + if (!user.userId.equals(owner.userId)) { + return false; + } + versionSettings = data; + return true; + } + } + + /** + * Add another connection for this file transfer. Currently only one + * connection is allowed, but this might change in the future. + * + * @param connection + * @return true if the connection is accepted, false if it should be + * discarded + */ + public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) { + if (state == TransferState.FINISHED || state == TransferState.ERROR) + return false; + synchronized (downloads) { + if (downloads.size() >= MAX_CONNECTIONS) + return false; + downloads.add(connection); + } + try { + pool.execute(new Runnable() { + @Override + public void run() { + CbHandler cbh = new CbHandler(); + if (!connection.download(cbh, cbh)) { + if (cbh.currentChunk != null) { + // If the download failed and we have a current chunk, put it back into + // the queue, so it will be handled again later... + chunks.markFailed(cbh.currentChunk); + } + LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed"); + } + lastActivityTime.set(System.currentTimeMillis()); + synchronized (downloads) { + downloads.remove(connection); + } + if (chunks.isComplete()) { + finishUpload(); + } + } + }); + if (state == TransferState.IDLE) { + state = TransferState.WORKING; + } + } catch (Exception e) { + LOGGER.warn("threadpool rejected the incoming file transfer", e); + return false; + } + return true; + } + + /** + * Write some data to the local file. Thread safe so we can + * have multiple concurrent connections. + * + * @param fileOffset + * @param dataLength + * @param data + * @return + */ + private void writeFileData(long fileOffset, int dataLength, byte[] data) { + if (state != TransferState.WORKING) + throw new IllegalStateException("Cannot write to file if state != RUNNING"); + synchronized (tmpFileHandle) { + try { + tmpFileHandle.seek(fileOffset); + tmpFileHandle.write(data, 0, dataLength); + } catch (IOException e) { + LOGGER.error("Cannot write to '" + tmpFileName + + "'. Disk full, network storage error, bad permissions, ...?", e); + fileWritable = false; + state = TransferState.ERROR; + } + } + } + + /** + * Called when the upload finished. + */ + private synchronized void finishUpload() { + synchronized (tmpFileHandle) { + if (state != TransferState.WORKING) + return; + Util.safeClose(tmpFileHandle); + state = TransferState.FINISHED; + } + potentialFinishTime.set(System.currentTimeMillis()); + // If owner is not set, this was a repair-transfer, which downloads directly to the existing target file. + // Nothing more to do in that case. + if (isRepairUpload()) + return; + LOGGER.info("Finalizing uploaded image " + image.imageName); + // Ready to go. First step: Rename temp file to something usable + File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName)); + // Sanity check: destination should be a sub directory of the vmStorePath + String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath()); + if (relPath == null) { + LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " + + Configuration.getVmStoreBasePath().getAbsolutePath()); + cancel(); + return; + } + if (relPath.length() > 200) { + LOGGER.error("Generated file name is >200 chars. DB will not like it"); + } + + // Execute rename + boolean ret = false; + Exception renameException = null; + try { + ret = tmpFileName.renameTo(destination); + } catch (Exception e) { + ret = false; + renameException = e; + } + if (!ret) { + // Rename failed :-( + LOGGER.warn( + "Could not rename '" + tmpFileName.getAbsolutePath() + "' to '" + + destination.getAbsolutePath() + "'", renameException); + cancel(); + return; + } + + // Now insert meta data into DB + try { + synchronized (versionWrittenToDb) { + DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath, + versionSettings, chunks, machineDescription); + versionWrittenToDb.set(true); + } + } catch (SQLException e) { + LOGGER.error("Error finishing upload: Inserting version to DB failed", e); + state = TransferState.ERROR; + // Also delete uploaded file, as there is no refence to it + FileSystem.deleteAsync(destination); + cancel(); + return; + } + } + + @Override + public synchronized void cancel() { + if (state != TransferState.FINISHED) { + state = TransferState.ERROR; + if (!isRepairUpload() && tmpFileName.exists()) { + FileSystem.deleteAsync(tmpFileName); + } + } + synchronized (downloads) { + for (Downloader download : downloads) { + download.cancel(); + } + } + } + + public boolean isRepairUpload() { + return owner == null; + } + + /** + * Get user owning this upload. Can be null in special cases. + * + * @return instance of UserInfo for the according user. + */ + public UserInfo getOwner() { + return this.owner; + } + + public File getDestinationFile() { + return this.tmpFileName; + } + + public long getSize() { + return this.fileSize; + } + + /** + * Callback class for an instance of the Downloader, which supplies + * the Downloader with wanted file ranges, and handles incoming data. + */ + private class CbHandler implements WantRangeCallback, DataReceivedCallback { + /** + * The current chunk being transfered. + */ + private FileChunk currentChunk = null; + private byte[] buffer = new byte[FileChunk.CHUNK_SIZE]; + + @Override + public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { + if (currentChunk == null) + throw new IllegalStateException("dataReceived without current chunk"); + if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength)) + throw new IllegalStateException("dataReceived with file data out of range"); + System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength); + return fileWritable; + } + + @Override + public FileRange get() { + if (currentChunk != null) { + if (hashChecker != null && currentChunk.getSha1Sum() != null) { + try { + hashChecker.queue(currentChunk, buffer, activeUpload); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + buffer = new byte[buffer.length]; + } else { + writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer); + chunks.markSuccessful(currentChunk); + } + } + // Get next missing chunk + try { + currentChunk = chunks.getMissing(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + if (currentChunk == null) { + return null; // No more chunks, returning null tells the Downloader we're done. + } + return currentChunk.range; + } + } + + public synchronized TransferStatus getStatus() { + return new TransferStatus(chunks.getStatusArray(), state); + } + + @Override + public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { + switch (result) { + case FAILURE: + LOGGER.warn("Hash check of chunk " + chunk.toString() + + " could not be executed. Assuming valid :-("); + // Fall through + case VALID: + writeFileData(chunk.range.startOffset, chunk.range.getLength(), data); + chunks.markSuccessful(chunk); + if (chunks.isComplete()) { + finishUpload(); + } + break; + case INVALID: + LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch " + + chunk.getFailCount() + "x :-("); + chunks.markFailed(chunk); + break; + } + } + + private byte[] loadChunkFromFile(FileChunk chunk) { + synchronized (tmpFileHandle) { + if (state != TransferState.IDLE && state != TransferState.WORKING) + return null; + try { + tmpFileHandle.seek(chunk.range.startOffset); + byte[] buffer = new byte[chunk.range.getLength()]; + tmpFileHandle.readFully(buffer); + return buffer; + } catch (IOException e) { + LOGGER.error( + "Could not read chunk " + chunk.getChunkIndex() + " of File " + + tmpFileName.toString(), e); + return null; + } + } + } + + public void updateBlockHashList(List<byte[]> hashList) { + if (state != TransferState.IDLE && state != TransferState.WORKING) + return; + if (hashChecker == null) + return; + chunks.updateSha1Sums(hashList); + FileChunk chunk; + while (null != (chunk = chunks.getUnhashedComplete())) { + byte[] data = loadChunkFromFile(chunk); + if (data == null) { + LOGGER.warn("Will mark unloadable chunk as valid :-("); + chunks.markSuccessful(chunk); + continue; + } + try { + hashChecker.queue(chunk, data, this); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + @Override + public boolean isActive() { + return state == TransferState.IDLE || state == TransferState.WORKING; + } + + @Override + public int getActiveConnectionCount() { + return downloads.size(); + } + + @Override + protected void finalize() { + try { + Util.safeClose(tmpFileHandle); + if (tmpFileName.exists()) + tmpFileName.delete(); + } catch (Throwable t) { + } + } + +} |