summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-13 18:39:26 +0200
committerSimon Rettberg2016-04-13 18:39:26 +0200
commit5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622 (patch)
tree5bdc5411cd9954577e5489d5e4271c800d826e9c /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
parent[client] fix bad commit (diff)
downloadtutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.gz
tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.xz
tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.zip
(WiP) Global image sync
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java406
1 files changed, 50 insertions, 356 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
index 92f533e8..4ce8cc0a 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
@@ -3,11 +3,9 @@ package org.openslx.bwlp.sat.fileserv;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -15,12 +13,10 @@ import javax.net.ssl.SSLContext;
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
-import org.openslx.bwlp.sat.thrift.ThriftUtil;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
-import org.openslx.bwlp.sat.util.Util;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
@@ -28,43 +24,20 @@ import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.bwlp.thrift.iface.UserInfo;
-import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
-import org.openslx.filetransfer.FileRange;
-import org.openslx.filetransfer.WantRangeCallback;
-import org.openslx.filetransfer.util.ChunkList;
import org.openslx.filetransfer.util.FileChunk;
-import org.openslx.filetransfer.util.HashChecker;
-import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
-import org.openslx.filetransfer.util.HashChecker.HashResult;
+import org.openslx.filetransfer.util.IncomingTransferBase;
+import org.openslx.util.ThriftUtil;
import org.openslx.util.vm.DiskImage;
import org.openslx.util.vm.DiskImage.UnknownImageFormatException;
-public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback {
+public class IncomingDataTransfer extends IncomingTransferBase {
private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class);
private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * (2 + Constants.MAX_UPLOADS);
/**
- * Self reference for inner classes.
- */
- private final IncomingDataTransfer activeUpload = this;
-
- /**
- * Remote peer is uploading, so on our end, we have Downloaders
- */
- private List<Downloader> downloads = new ArrayList<>();
-
- private final File tmpFileName;
-
- private final RandomAccessFile tmpFileHandle;
-
- private final ChunkList chunks;
-
- private final long fileSize;
-
- /**
* User owning this uploaded file.
*/
private final UserInfo owner;
@@ -80,11 +53,6 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
private ImageVersionWrite versionSettings = null;
/**
- * TransferState of this upload
- */
- private TransferState state = TransferState.IDLE;
-
- /**
* Description of this VM - binary dump of e.g. the *.vmx file (VMware)
*/
private final byte[] machineDescription;
@@ -96,44 +64,24 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
private final AtomicBoolean versionWrittenToDb = new AtomicBoolean();
/**
- * Whether file is (still) writable. Used for the file transfer callbacks.
- */
- private boolean fileWritable = true;
-
- /**
* Set if this is a download from the master server
*/
private final TransferInformation masterTransferInfo;
- private static final HashChecker hashChecker;
-
- static {
- HashChecker hc;
- try {
- hc = new HashChecker("SHA-1", Constants.HASHCHECK_QUEUE_LEN);
- } catch (NoSuchAlgorithmException e) {
- hc = null;
- }
- hashChecker = hc;
- }
-
public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image,
File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription)
throws FileNotFoundException {
- super(uploadId);
- this.tmpFileName = destinationFile;
- this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw");
- this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums);
+ super(uploadId, destinationFile, fileSize, sha1Sums);
this.owner = owner;
this.image = image;
- this.fileSize = fileSize;
this.machineDescription = machineDescription;
this.masterTransferInfo = null;
}
public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo)
throws FileNotFoundException {
- super(publishData.imageVersionId);
+ super(UUID.randomUUID().toString(), tmpFile, publishData.fileSize,
+ ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
ImageDetailsRead idr = new ImageDetailsRead();
idr.setCreateTime(publishData.createTime);
idr.setDescription(publishData.description);
@@ -147,13 +95,8 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
idr.setUpdaterId(publishData.user.userId);
idr.setUpdateTime(publishData.createTime);
idr.setVirtId(publishData.virtId);
- this.tmpFileName = tmpFile;
- this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw");
- this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null
- : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
this.owner = publishData.user;
this.image = idr;
- this.fileSize = publishData.fileSize;
this.machineDescription = ThriftUtil.unwrapByteBuffer(transferInfo.machineDescription);
this.masterTransferInfo = transferInfo;
this.versionSettings = new ImageVersionWrite(false);
@@ -167,10 +110,8 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
if (masterTransferInfo == null)
return;
synchronized (this) {
- synchronized (downloads) {
- if (downloads.size() >= 1) // TODO What to pick here?
- return;
- }
+ if (getActiveConnectionCount() >= 1)
+ return;
Downloader downloader = null;
if (masterTransferInfo.plainPort != 0) {
try {
@@ -221,114 +162,33 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
}
/**
- * Add another connection for this file transfer.
- *
- * @param connection
- * @return true if the connection is accepted, false if it should be
- * discarded
- */
- public synchronized boolean addConnection(final Downloader connection, ExecutorService pool) {
- if (state == TransferState.FINISHED || state == TransferState.ERROR)
- return false;
- synchronized (downloads) {
- if (downloads.size() >= Constants.MAX_CONNECTIONS_PER_TRANSFER)
- return false;
- downloads.add(connection);
- }
- try {
- pool.execute(new Runnable() {
- @Override
- public void run() {
- CbHandler cbh = new CbHandler(connection);
- if (!connection.download(cbh, cbh)) {
- if (cbh.currentChunk != null) {
- // If the download failed and we have a current chunk, put it back into
- // the queue, so it will be handled again later...
- chunks.markFailed(cbh.currentChunk);
- }
- LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed");
- }
- if (state != TransferState.FINISHED && state != TransferState.ERROR) {
- lastActivityTime.set(System.currentTimeMillis());
- }
- synchronized (downloads) {
- downloads.remove(connection);
- }
- if (chunks.isComplete()) {
- finishUpload();
- }
- }
- });
- } catch (Exception e) {
- LOGGER.warn("threadpool rejected the incoming file transfer", e);
- synchronized (downloads) {
- downloads.remove(connection);
- }
- return false;
- }
- if (state == TransferState.IDLE) {
- state = TransferState.WORKING;
- }
- return true;
- }
-
- /**
- * Write some data to the local file. Thread safe so we can
- * have multiple concurrent connections.
- *
- * @param fileOffset
- * @param dataLength
- * @param data
- * @return
- */
- private void writeFileData(long fileOffset, int dataLength, byte[] data) {
- synchronized (tmpFileHandle) {
- if (state != TransferState.WORKING)
- throw new IllegalStateException("Cannot write to file if state != WORKING");
- try {
- tmpFileHandle.seek(fileOffset);
- tmpFileHandle.write(data, 0, dataLength);
- } catch (IOException e) {
- LOGGER.error("Cannot write to '" + tmpFileName
- + "'. Disk full, network storage error, bad permissions, ...?", e);
- fileWritable = false;
- }
- }
- if (!fileWritable) {
- cancel();
- }
- }
-
- /**
* Called when the upload finished.
*/
- private synchronized void finishUpload() {
- synchronized (tmpFileHandle) {
- if (state != TransferState.WORKING)
- return;
- Util.safeClose(tmpFileHandle);
- state = TransferState.FINISHED;
- }
+ @Override
+ protected synchronized boolean finishIncomingTransfer() {
+ if (getState() != TransferState.FINISHED)
+ return false;
potentialFinishTime.set(System.currentTimeMillis());
// If owner is not set, this was a repair-transfer, which downloads directly to the existing target file.
// Nothing more to do in that case.
if (isRepairUpload())
- return;
+ return true;
LOGGER.info("Finalizing uploaded image " + image.imageName);
// Ready to go. First step: Rename temp file to something usable
String ext = "img";
try {
- ext = new DiskImage(tmpFileName).format.extension;
+ ext = new DiskImage(getTmpFileName()).format.extension;
} catch (IOException | UnknownImageFormatException e1) {
}
- File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName, ext));
+ File destination = new File(getTmpFileName().getParent(), Formatter.vmName(owner, image.imageName,
+ ext));
// Sanity check: destination should be a sub directory of the vmStorePath
String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath());
if (relPath == null) {
LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of "
+ Configuration.getVmStoreBasePath().getAbsolutePath());
cancel();
- return;
+ return false;
}
if (relPath.length() > 200) {
LOGGER.error("Generated file name is >200 chars. DB will not like it");
@@ -338,7 +198,7 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
boolean ret = false;
Exception renameException = null;
try {
- ret = tmpFileName.renameTo(destination);
+ ret = getTmpFileName().renameTo(destination);
} catch (Exception e) {
ret = false;
renameException = e;
@@ -346,43 +206,34 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
if (!ret) {
// Rename failed :-(
LOGGER.warn(
- "Could not rename '" + tmpFileName.getAbsolutePath() + "' to '"
+ "Could not rename '" + getTmpFileName().getAbsolutePath() + "' to '"
+ destination.getAbsolutePath() + "'", renameException);
cancel();
- return;
+ return false;
}
// Now insert meta data into DB
try {
synchronized (versionWrittenToDb) {
- DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath,
- versionSettings, chunks, machineDescription);
+ DbImage.createImageVersion(image.imageBaseId, getId(), owner, getFileSize(), relPath,
+ versionSettings, getChunks(), machineDescription);
versionWrittenToDb.set(true);
}
} catch (SQLException e) {
LOGGER.error("Error finishing upload: Inserting version to DB failed", e);
- state = TransferState.ERROR;
// Also delete uploaded file, as there is no reference to it
FileSystem.deleteAsync(destination);
cancel();
- return;
+ return false;
}
+ return true;
}
@Override
public synchronized void cancel() {
- if (state != TransferState.FINISHED && state != TransferState.ERROR) {
- state = TransferState.ERROR;
- }
- synchronized (downloads) {
- for (Downloader download : downloads) {
- download.cancel();
- }
- }
- lastActivityTime.set(0);
- Util.safeClose(tmpFileHandle);
- if (!isRepairUpload() && tmpFileName.exists()) {
- FileSystem.deleteAsync(tmpFileName);
+ super.cancel();
+ if (!isRepairUpload() && getTmpFileName().exists()) {
+ FileSystem.deleteAsync(getTmpFileName());
}
}
@@ -399,203 +250,46 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC
return this.owner;
}
- public File getDestinationFile() {
- return this.tmpFileName;
- }
-
- public long getSize() {
- return this.fileSize;
- }
-
- /**
- * Callback class for an instance of the Downloader, which supplies
- * the Downloader with wanted file ranges, and handles incoming data.
- */
- private class CbHandler implements WantRangeCallback, DataReceivedCallback {
- /**
- * The current chunk being transfered.
- */
- private FileChunk currentChunk = null;
- /**
- * Current buffer to receive to
- */
- private byte[] buffer = new byte[FileChunk.CHUNK_SIZE];
- /**
- * Downloader object
- */
- private final Downloader downloader;
-
- private CbHandler(Downloader downloader) {
- this.downloader = downloader;
- }
-
- @Override
- public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
- if (currentChunk == null)
- throw new IllegalStateException("dataReceived without current chunk");
- if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength))
- throw new IllegalStateException("dataReceived with file data out of range");
- System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength);
- return fileWritable;
- }
-
- @Override
- public FileRange get() {
- if (currentChunk != null) {
- if (hashChecker != null && currentChunk.getSha1Sum() != null) {
- try {
- hashChecker.queue(currentChunk, buffer, activeUpload);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- }
- try {
- buffer = new byte[buffer.length];
- } catch (OutOfMemoryError e) {
- // Usually catching OOM errors is a bad idea, but it's quite safe here as
- // we know exactly where it happened, no hidden sub-calls through 20 objects.
- // The most likely cause here is that the hash checker/disk cannot keep up
- // writing out completed chunks, so we just sleep a bit and try again. If it still
- // fails, we exit completely.
- try {
- Thread.sleep(6000);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- return null;
- }
- // Might raise OOM again, but THIS TIME I MEAN IT
- try {
- buffer = new byte[buffer.length];
- } catch (OutOfMemoryError e2) {
- downloader.sendErrorCode("Out of RAM");
- cancel();
- }
- }
- } else {
- // We have no hash checker or the hash for the current chunk is unknown - flush to disk
- writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer);
- chunks.markSuccessful(currentChunk);
- }
- }
- // Get next missing chunk
- try {
- currentChunk = chunks.getMissing();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- cancel();
- return null;
- }
- if (currentChunk == null) {
- return null; // No more chunks, returning null tells the Downloader we're done.
- }
- // Check remaining disk space and abort if it's too low
- long space = FileSystem.getAvailableStorageBytes();
- if (space != -1 && space < MIN_FREE_SPACE_BYTES) {
- downloader.sendErrorCode("Out of disk space");
- LOGGER.error("Out of space: Cancelling upload of "
- + (image == null ? "image" : image.imageName) + " by "
- + Formatter.userFullName(owner));
- cancel();
- return null;
- }
- return currentChunk.range;
- }
- }
-
public synchronized TransferStatus getStatus() {
- return new TransferStatus(chunks.getStatusArray(), state);
+ return new TransferStatus(getChunks().getStatusArray(), getState());
}
@Override
- public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) {
- if (state != TransferState.IDLE && state != TransferState.WORKING)
- return;
- switch (result) {
- case FAILURE:
- LOGGER.warn("Hash check of chunk " + chunk.toString()
- + " could not be executed. Assuming valid :-(");
- // Fall through
- case VALID:
- if (!chunk.isWrittenToDisk()) {
- writeFileData(chunk.range.startOffset, chunk.range.getLength(), data);
- }
- chunks.markSuccessful(chunk);
- if (chunks.isComplete()) {
- finishUpload();
- }
- break;
- case INVALID:
- LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch "
- + chunk.getFailCount() + "x :-(");
- chunks.markFailed(chunk);
- break;
- }
+ public boolean isActive() {
+ return getState() == TransferState.IDLE || getState() == TransferState.WORKING;
}
- private byte[] loadChunkFromFile(FileChunk chunk) {
- synchronized (tmpFileHandle) {
- if (state != TransferState.IDLE && state != TransferState.WORKING)
- return null;
- try {
- tmpFileHandle.seek(chunk.range.startOffset);
- byte[] buffer = new byte[chunk.range.getLength()];
- tmpFileHandle.readFully(buffer);
- return buffer;
- } catch (IOException e) {
- LOGGER.error(
- "Could not read chunk " + chunk.getChunkIndex() + " of File "
- + tmpFileName.toString(), e);
- return null;
- }
+ @Override
+ protected void finalize() {
+ try {
+ super.finalize();
+ } catch (Throwable t) {
+ }
+ try {
+ cancel();
+ } catch (Throwable t) {
}
}
- public void updateBlockHashList(List<byte[]> hashList) {
- if (state != TransferState.IDLE && state != TransferState.WORKING) {
- LOGGER.debug(this.getId() + ": Rejecting block hash list in state " + state);
- return;
- }
- if (hashChecker == null || hashList == null) {
- LOGGER.debug(this.getId() + ": Rejecting block hash list: No hasher");
- return;
- }
- chunks.updateSha1Sums(hashList);
- FileChunk chunk;
- while (null != (chunk = chunks.getUnhashedComplete())) {
- byte[] data = loadChunkFromFile(chunk);
- if (data == null) {
- LOGGER.warn("Will mark unloadable chunk as valid :-(");
- chunks.markSuccessful(chunk);
- continue;
- }
- try {
- hashChecker.queue(chunk, data, this);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- }
+ @Override
+ protected boolean hasEnoughFreeSpace() {
+ return FileSystem.getAvailableStorageBytes() > MIN_FREE_SPACE_BYTES;
}
@Override
- public boolean isActive() {
- return state == TransferState.IDLE || state == TransferState.WORKING;
+ public TransferInformation getTransferInfo() {
+ return new TransferInformation(getId(), FileServer.instance().getPlainPort(), FileServer.instance()
+ .getSslPort());
}
@Override
- public int getActiveConnectionCount() {
- return downloads.size();
+ public String getRelativePath() {
+ return FileSystem.getRelativePath(getTmpFileName(), Configuration.getVmStoreBasePath());
}
@Override
- protected void finalize() {
- try {
- Util.safeClose(tmpFileHandle);
- if (tmpFileName.exists()) {
- FileSystem.deleteAsync(tmpFileName);
- }
- } catch (Throwable t) {
- }
+ protected void chunkStatusChanged(FileChunk chunk) {
+ // TODO Update in DB in case this is a repair upload
}
}