summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
diff options
context:
space:
mode:
authorSimon Rettberg2015-08-28 18:04:16 +0200
committerSimon Rettberg2015-08-28 18:04:16 +0200
commit10f0687fe551bda88120c2dc2b003035dd9bbea8 (patch)
treea9a3103c5ca1981bad169a6a1527f0252c4bc76f /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
parent[client] save the selected download folder and not the generated folder (diff)
downloadtutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.gz
tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.tar.xz
tutor-module-10f0687fe551bda88120c2dc2b003035dd9bbea8.zip
[server] Working on image download from master server
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java537
1 files changed, 537 insertions, 0 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
new file mode 100644
index 00000000..0ca5d9b6
--- /dev/null
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
@@ -0,0 +1,537 @@
+package org.openslx.bwlp.sat.fileserv;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.StandardCharsets;
+import java.security.NoSuchAlgorithmException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.log4j.Logger;
+import org.openslx.bwlp.sat.database.mappers.DbImage;
+import org.openslx.bwlp.sat.thrift.ThriftUtil;
+import org.openslx.bwlp.sat.util.Configuration;
+import org.openslx.bwlp.sat.util.Constants;
+import org.openslx.bwlp.sat.util.FileSystem;
+import org.openslx.bwlp.sat.util.Formatter;
+import org.openslx.bwlp.sat.util.Util;
+import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
+import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+import org.openslx.bwlp.thrift.iface.TransferState;
+import org.openslx.bwlp.thrift.iface.TransferStatus;
+import org.openslx.bwlp.thrift.iface.UserInfo;
+import org.openslx.filetransfer.DataReceivedCallback;
+import org.openslx.filetransfer.Downloader;
+import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.WantRangeCallback;
+import org.openslx.filetransfer.util.ChunkList;
+import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.filetransfer.util.HashChecker;
+import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
+import org.openslx.filetransfer.util.HashChecker.HashResult;
+
+public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback {
+
+ private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class);
+
+ /**
+ * How many concurrent connections per upload
+ */
+ private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_UPLOADS / 4, 1);
+
+ /**
+ * Self reference for inner classes.
+ */
+ private final IncomingDataTransfer activeUpload = this;
+
+ /**
+ * Remote peer is uploading, so on our end, we have Downloaders
+ */
+ private List<Downloader> downloads = new ArrayList<>();
+
+ private final File tmpFileName;
+
+ private final RandomAccessFile tmpFileHandle;
+
+ private final ChunkList chunks;
+
+ private final long fileSize;
+
+ /**
+ * User owning this uploaded file.
+ */
+ private final UserInfo owner;
+
+ /**
+ * Base image this upload is a new version for.
+ */
+ private final ImageDetailsRead image;
+
+ /**
+ * Flags to set for this new image version. Optional field.
+ */
+ private ImageVersionWrite versionSettings = null;
+
+ /**
+ * TransferState of this upload
+ */
+ private TransferState state = TransferState.IDLE;
+
+ /**
+ * Description of this VM - binary dump of e.g. the *.vmx file (VMware)
+ */
+ private final byte[] machineDescription;
+
+ /**
+ * Indicated whether the version information was written to db already.
+ * Disallow setVersionData in that case.
+ */
+ private final AtomicBoolean versionWrittenToDb = new AtomicBoolean();
+
+ /**
+ * Whether file is (still) writable. Used for the file transfer callbacks.
+ */
+ private boolean fileWritable = true;
+
+ /**
+ * Set if this is a download from the master server
+ */
+ private final TransferInformation masterTransferInfo;
+
+ private static final HashChecker hashChecker;
+
+ static {
+ HashChecker hc;
+ try {
+ hc = new HashChecker("SHA-1");
+ } catch (NoSuchAlgorithmException e) {
+ hc = null;
+ }
+ hashChecker = hc;
+ }
+
+ public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image,
+ File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription)
+ throws FileNotFoundException {
+ super(uploadId);
+ this.tmpFileName = destinationFile;
+ this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw");
+ this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums);
+ this.owner = owner;
+ this.image = image;
+ this.fileSize = fileSize;
+ this.machineDescription = machineDescription;
+ this.masterTransferInfo = null;
+ }
+
+ public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo)
+ throws FileNotFoundException {
+ super(publishData.imageVersionId);
+ ImageDetailsRead idr = new ImageDetailsRead();
+ idr.setCreateTime(publishData.createTime);
+ idr.setDescription(publishData.description);
+ idr.setImageBaseId(publishData.imageBaseId);
+ idr.setImageName(publishData.imageName);
+ idr.setIsTemplate(publishData.isTemplate);
+ idr.setLatestVersionId(publishData.imageVersionId);
+ idr.setOsId(publishData.osId);
+ idr.setOwnerId(publishData.user.userId);
+ idr.setTags(publishData.tags);
+ idr.setUpdaterId(publishData.user.userId);
+ idr.setUpdateTime(publishData.createTime);
+ idr.setVirtId(publishData.virtId);
+ this.tmpFileName = tmpFile;
+ this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw");
+ this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null
+ : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
+ this.owner = publishData.user;
+ this.image = idr;
+ this.fileSize = publishData.fileSize;
+ this.machineDescription = transferInfo.machineDescription.getBytes(StandardCharsets.UTF_8);
+ this.masterTransferInfo = transferInfo;
+ this.versionSettings = new ImageVersionWrite(false);
+ }
+
+ /**
+ * Called periodically if this is a transfer from the master server, so we
+ * can make sure the transfer is running.
+ */
+ public void heartBeat(ThreadPoolExecutor pool) {
+ if (masterTransferInfo == null)
+ return;
+ synchronized (this) {
+ synchronized (downloads) {
+ if (downloads.size() >= 1) // TODO What to pick here?
+ return;
+ }
+ Downloader downloader = null;
+ if (masterTransferInfo.plainPort != 0) {
+ try {
+ downloader = new Downloader(Configuration.getMasterServerAddress(),
+ masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null,
+ masterTransferInfo.token);
+ } catch (Exception e1) {
+ LOGGER.debug("Plain connect failed", e1);
+ downloader = null;
+ }
+ }
+ if (downloader == null && masterTransferInfo.sslPort != 0) {
+ try {
+ downloader = new Downloader(Configuration.getMasterServerAddress(),
+ masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready
+ masterTransferInfo.token);
+ } catch (Exception e2) {
+ LOGGER.debug("SSL connect failed", e2);
+ downloader = null;
+ }
+ }
+ if (downloader == null) {
+ LOGGER.warn("Could not connect to master server for downloading " + image.imageName);
+ return;
+ }
+ addConnection(downloader, pool);
+ }
+ }
+
+ /**
+ * Set meta data for this image version.
+ *
+ * @param user
+ *
+ * @param data
+ */
+ public boolean setVersionData(UserInfo user, ImageVersionWrite data) {
+ synchronized (versionWrittenToDb) {
+ if (versionWrittenToDb.get()) {
+ return false;
+ }
+ if (!user.userId.equals(owner.userId)) {
+ return false;
+ }
+ versionSettings = data;
+ return true;
+ }
+ }
+
+ /**
+ * Add another connection for this file transfer. Currently only one
+ * connection is allowed, but this might change in the future.
+ *
+ * @param connection
+ * @return true if the connection is accepted, false if it should be
+ * discarded
+ */
+ public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) {
+ if (state == TransferState.FINISHED || state == TransferState.ERROR)
+ return false;
+ synchronized (downloads) {
+ if (downloads.size() >= MAX_CONNECTIONS)
+ return false;
+ downloads.add(connection);
+ }
+ try {
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ CbHandler cbh = new CbHandler();
+ if (!connection.download(cbh, cbh)) {
+ if (cbh.currentChunk != null) {
+ // If the download failed and we have a current chunk, put it back into
+ // the queue, so it will be handled again later...
+ chunks.markFailed(cbh.currentChunk);
+ }
+ LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed");
+ }
+ lastActivityTime.set(System.currentTimeMillis());
+ synchronized (downloads) {
+ downloads.remove(connection);
+ }
+ if (chunks.isComplete()) {
+ finishUpload();
+ }
+ }
+ });
+ if (state == TransferState.IDLE) {
+ state = TransferState.WORKING;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("threadpool rejected the incoming file transfer", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Write some data to the local file. Thread safe so we can
+ * have multiple concurrent connections.
+ *
+ * @param fileOffset
+ * @param dataLength
+ * @param data
+ * @return
+ */
+ private void writeFileData(long fileOffset, int dataLength, byte[] data) {
+ if (state != TransferState.WORKING)
+ throw new IllegalStateException("Cannot write to file if state != RUNNING");
+ synchronized (tmpFileHandle) {
+ try {
+ tmpFileHandle.seek(fileOffset);
+ tmpFileHandle.write(data, 0, dataLength);
+ } catch (IOException e) {
+ LOGGER.error("Cannot write to '" + tmpFileName
+ + "'. Disk full, network storage error, bad permissions, ...?", e);
+ fileWritable = false;
+ state = TransferState.ERROR;
+ }
+ }
+ }
+
+ /**
+ * Called when the upload finished.
+ */
+ private synchronized void finishUpload() {
+ synchronized (tmpFileHandle) {
+ if (state != TransferState.WORKING)
+ return;
+ Util.safeClose(tmpFileHandle);
+ state = TransferState.FINISHED;
+ }
+ potentialFinishTime.set(System.currentTimeMillis());
+ // If owner is not set, this was a repair-transfer, which downloads directly to the existing target file.
+ // Nothing more to do in that case.
+ if (isRepairUpload())
+ return;
+ LOGGER.info("Finalizing uploaded image " + image.imageName);
+ // Ready to go. First step: Rename temp file to something usable
+ File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName));
+ // Sanity check: destination should be a sub directory of the vmStorePath
+ String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath());
+ if (relPath == null) {
+ LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of "
+ + Configuration.getVmStoreBasePath().getAbsolutePath());
+ cancel();
+ return;
+ }
+ if (relPath.length() > 200) {
+ LOGGER.error("Generated file name is >200 chars. DB will not like it");
+ }
+
+ // Execute rename
+ boolean ret = false;
+ Exception renameException = null;
+ try {
+ ret = tmpFileName.renameTo(destination);
+ } catch (Exception e) {
+ ret = false;
+ renameException = e;
+ }
+ if (!ret) {
+ // Rename failed :-(
+ LOGGER.warn(
+ "Could not rename '" + tmpFileName.getAbsolutePath() + "' to '"
+ + destination.getAbsolutePath() + "'", renameException);
+ cancel();
+ return;
+ }
+
+ // Now insert meta data into DB
+ try {
+ synchronized (versionWrittenToDb) {
+ DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath,
+ versionSettings, chunks, machineDescription);
+ versionWrittenToDb.set(true);
+ }
+ } catch (SQLException e) {
+ LOGGER.error("Error finishing upload: Inserting version to DB failed", e);
+ state = TransferState.ERROR;
+ // Also delete uploaded file, as there is no refence to it
+ FileSystem.deleteAsync(destination);
+ cancel();
+ return;
+ }
+ }
+
+ @Override
+ public synchronized void cancel() {
+ if (state != TransferState.FINISHED) {
+ state = TransferState.ERROR;
+ if (!isRepairUpload() && tmpFileName.exists()) {
+ FileSystem.deleteAsync(tmpFileName);
+ }
+ }
+ synchronized (downloads) {
+ for (Downloader download : downloads) {
+ download.cancel();
+ }
+ }
+ }
+
+ public boolean isRepairUpload() {
+ return owner == null;
+ }
+
+ /**
+ * Get user owning this upload. Can be null in special cases.
+ *
+ * @return instance of UserInfo for the according user.
+ */
+ public UserInfo getOwner() {
+ return this.owner;
+ }
+
+ public File getDestinationFile() {
+ return this.tmpFileName;
+ }
+
+ public long getSize() {
+ return this.fileSize;
+ }
+
+ /**
+ * Callback class for an instance of the Downloader, which supplies
+ * the Downloader with wanted file ranges, and handles incoming data.
+ */
+ private class CbHandler implements WantRangeCallback, DataReceivedCallback {
+ /**
+ * The current chunk being transfered.
+ */
+ private FileChunk currentChunk = null;
+ private byte[] buffer = new byte[FileChunk.CHUNK_SIZE];
+
+ @Override
+ public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
+ if (currentChunk == null)
+ throw new IllegalStateException("dataReceived without current chunk");
+ if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength))
+ throw new IllegalStateException("dataReceived with file data out of range");
+ System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength);
+ return fileWritable;
+ }
+
+ @Override
+ public FileRange get() {
+ if (currentChunk != null) {
+ if (hashChecker != null && currentChunk.getSha1Sum() != null) {
+ try {
+ hashChecker.queue(currentChunk, buffer, activeUpload);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ buffer = new byte[buffer.length];
+ } else {
+ writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer);
+ chunks.markSuccessful(currentChunk);
+ }
+ }
+ // Get next missing chunk
+ try {
+ currentChunk = chunks.getMissing();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ if (currentChunk == null) {
+ return null; // No more chunks, returning null tells the Downloader we're done.
+ }
+ return currentChunk.range;
+ }
+ }
+
+ public synchronized TransferStatus getStatus() {
+ return new TransferStatus(chunks.getStatusArray(), state);
+ }
+
+ @Override
+ public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) {
+ switch (result) {
+ case FAILURE:
+ LOGGER.warn("Hash check of chunk " + chunk.toString()
+ + " could not be executed. Assuming valid :-(");
+ // Fall through
+ case VALID:
+ writeFileData(chunk.range.startOffset, chunk.range.getLength(), data);
+ chunks.markSuccessful(chunk);
+ if (chunks.isComplete()) {
+ finishUpload();
+ }
+ break;
+ case INVALID:
+ LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch "
+ + chunk.getFailCount() + "x :-(");
+ chunks.markFailed(chunk);
+ break;
+ }
+ }
+
+ private byte[] loadChunkFromFile(FileChunk chunk) {
+ synchronized (tmpFileHandle) {
+ if (state != TransferState.IDLE && state != TransferState.WORKING)
+ return null;
+ try {
+ tmpFileHandle.seek(chunk.range.startOffset);
+ byte[] buffer = new byte[chunk.range.getLength()];
+ tmpFileHandle.readFully(buffer);
+ return buffer;
+ } catch (IOException e) {
+ LOGGER.error(
+ "Could not read chunk " + chunk.getChunkIndex() + " of File "
+ + tmpFileName.toString(), e);
+ return null;
+ }
+ }
+ }
+
+ public void updateBlockHashList(List<byte[]> hashList) {
+ if (state != TransferState.IDLE && state != TransferState.WORKING)
+ return;
+ if (hashChecker == null)
+ return;
+ chunks.updateSha1Sums(hashList);
+ FileChunk chunk;
+ while (null != (chunk = chunks.getUnhashedComplete())) {
+ byte[] data = loadChunkFromFile(chunk);
+ if (data == null) {
+ LOGGER.warn("Will mark unloadable chunk as valid :-(");
+ chunks.markSuccessful(chunk);
+ continue;
+ }
+ try {
+ hashChecker.queue(chunk, data, this);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ @Override
+ public boolean isActive() {
+ return state == TransferState.IDLE || state == TransferState.WORKING;
+ }
+
+ @Override
+ public int getActiveConnectionCount() {
+ return downloads.size();
+ }
+
+ @Override
+ protected void finalize() {
+ try {
+ Util.safeClose(tmpFileHandle);
+ if (tmpFileName.exists())
+ tmpFileName.delete();
+ } catch (Throwable t) {
+ }
+ }
+
+}