diff options
author | Simon Rettberg | 2016-04-13 18:39:26 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-13 18:39:26 +0200 |
commit | 5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622 (patch) | |
tree | 5bdc5411cd9954577e5489d5e4271c800d826e9c /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java | |
parent | [client] fix bad commit (diff) | |
download | tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.gz tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.xz tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.zip |
(WiP) Global image sync
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java')
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java | 406 |
1 files changed, 50 insertions, 356 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java index 92f533e8..4ce8cc0a 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -3,11 +3,9 @@ package org.openslx.bwlp.sat.fileserv; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; -import java.security.NoSuchAlgorithmException; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -15,12 +13,10 @@ import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; -import org.openslx.bwlp.sat.thrift.ThriftUtil; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; -import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; @@ -28,43 +24,20 @@ import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.bwlp.thrift.iface.UserInfo; -import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; -import org.openslx.filetransfer.FileRange; -import org.openslx.filetransfer.WantRangeCallback; -import org.openslx.filetransfer.util.ChunkList; import org.openslx.filetransfer.util.FileChunk; -import org.openslx.filetransfer.util.HashChecker; -import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; -import org.openslx.filetransfer.util.HashChecker.HashResult; +import org.openslx.filetransfer.util.IncomingTransferBase; +import org.openslx.util.ThriftUtil; import org.openslx.util.vm.DiskImage; import org.openslx.util.vm.DiskImage.UnknownImageFormatException; -public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback { +public class IncomingDataTransfer extends IncomingTransferBase { private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class); private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * (2 + Constants.MAX_UPLOADS); /** - * Self reference for inner classes. - */ - private final IncomingDataTransfer activeUpload = this; - - /** - * Remote peer is uploading, so on our end, we have Downloaders - */ - private List<Downloader> downloads = new ArrayList<>(); - - private final File tmpFileName; - - private final RandomAccessFile tmpFileHandle; - - private final ChunkList chunks; - - private final long fileSize; - - /** * User owning this uploaded file. */ private final UserInfo owner; @@ -80,11 +53,6 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC private ImageVersionWrite versionSettings = null; /** - * TransferState of this upload - */ - private TransferState state = TransferState.IDLE; - - /** * Description of this VM - binary dump of e.g. the *.vmx file (VMware) */ private final byte[] machineDescription; @@ -96,44 +64,24 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC private final AtomicBoolean versionWrittenToDb = new AtomicBoolean(); /** - * Whether file is (still) writable. Used for the file transfer callbacks. - */ - private boolean fileWritable = true; - - /** * Set if this is a download from the master server */ private final TransferInformation masterTransferInfo; - private static final HashChecker hashChecker; - - static { - HashChecker hc; - try { - hc = new HashChecker("SHA-1", Constants.HASHCHECK_QUEUE_LEN); - } catch (NoSuchAlgorithmException e) { - hc = null; - } - hashChecker = hc; - } - public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws FileNotFoundException { - super(uploadId); - this.tmpFileName = destinationFile; - this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw"); - this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums); + super(uploadId, destinationFile, fileSize, sha1Sums); this.owner = owner; this.image = image; - this.fileSize = fileSize; this.machineDescription = machineDescription; this.masterTransferInfo = null; } public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo) throws FileNotFoundException { - super(publishData.imageVersionId); + super(UUID.randomUUID().toString(), tmpFile, publishData.fileSize, + ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); ImageDetailsRead idr = new ImageDetailsRead(); idr.setCreateTime(publishData.createTime); idr.setDescription(publishData.description); @@ -147,13 +95,8 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC idr.setUpdaterId(publishData.user.userId); idr.setUpdateTime(publishData.createTime); idr.setVirtId(publishData.virtId); - this.tmpFileName = tmpFile; - this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw"); - this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null - : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); this.owner = publishData.user; this.image = idr; - this.fileSize = publishData.fileSize; this.machineDescription = ThriftUtil.unwrapByteBuffer(transferInfo.machineDescription); this.masterTransferInfo = transferInfo; this.versionSettings = new ImageVersionWrite(false); @@ -167,10 +110,8 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC if (masterTransferInfo == null) return; synchronized (this) { - synchronized (downloads) { - if (downloads.size() >= 1) // TODO What to pick here? - return; - } + if (getActiveConnectionCount() >= 1) + return; Downloader downloader = null; if (masterTransferInfo.plainPort != 0) { try { @@ -221,114 +162,33 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC } /** - * Add another connection for this file transfer. - * - * @param connection - * @return true if the connection is accepted, false if it should be - * discarded - */ - public synchronized boolean addConnection(final Downloader connection, ExecutorService pool) { - if (state == TransferState.FINISHED || state == TransferState.ERROR) - return false; - synchronized (downloads) { - if (downloads.size() >= Constants.MAX_CONNECTIONS_PER_TRANSFER) - return false; - downloads.add(connection); - } - try { - pool.execute(new Runnable() { - @Override - public void run() { - CbHandler cbh = new CbHandler(connection); - if (!connection.download(cbh, cbh)) { - if (cbh.currentChunk != null) { - // If the download failed and we have a current chunk, put it back into - // the queue, so it will be handled again later... - chunks.markFailed(cbh.currentChunk); - } - LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed"); - } - if (state != TransferState.FINISHED && state != TransferState.ERROR) { - lastActivityTime.set(System.currentTimeMillis()); - } - synchronized (downloads) { - downloads.remove(connection); - } - if (chunks.isComplete()) { - finishUpload(); - } - } - }); - } catch (Exception e) { - LOGGER.warn("threadpool rejected the incoming file transfer", e); - synchronized (downloads) { - downloads.remove(connection); - } - return false; - } - if (state == TransferState.IDLE) { - state = TransferState.WORKING; - } - return true; - } - - /** - * Write some data to the local file. Thread safe so we can - * have multiple concurrent connections. - * - * @param fileOffset - * @param dataLength - * @param data - * @return - */ - private void writeFileData(long fileOffset, int dataLength, byte[] data) { - synchronized (tmpFileHandle) { - if (state != TransferState.WORKING) - throw new IllegalStateException("Cannot write to file if state != WORKING"); - try { - tmpFileHandle.seek(fileOffset); - tmpFileHandle.write(data, 0, dataLength); - } catch (IOException e) { - LOGGER.error("Cannot write to '" + tmpFileName - + "'. Disk full, network storage error, bad permissions, ...?", e); - fileWritable = false; - } - } - if (!fileWritable) { - cancel(); - } - } - - /** * Called when the upload finished. */ - private synchronized void finishUpload() { - synchronized (tmpFileHandle) { - if (state != TransferState.WORKING) - return; - Util.safeClose(tmpFileHandle); - state = TransferState.FINISHED; - } + @Override + protected synchronized boolean finishIncomingTransfer() { + if (getState() != TransferState.FINISHED) + return false; potentialFinishTime.set(System.currentTimeMillis()); // If owner is not set, this was a repair-transfer, which downloads directly to the existing target file. // Nothing more to do in that case. if (isRepairUpload()) - return; + return true; LOGGER.info("Finalizing uploaded image " + image.imageName); // Ready to go. First step: Rename temp file to something usable String ext = "img"; try { - ext = new DiskImage(tmpFileName).format.extension; + ext = new DiskImage(getTmpFileName()).format.extension; } catch (IOException | UnknownImageFormatException e1) { } - File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName, ext)); + File destination = new File(getTmpFileName().getParent(), Formatter.vmName(owner, image.imageName, + ext)); // Sanity check: destination should be a sub directory of the vmStorePath String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath()); if (relPath == null) { LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " + Configuration.getVmStoreBasePath().getAbsolutePath()); cancel(); - return; + return false; } if (relPath.length() > 200) { LOGGER.error("Generated file name is >200 chars. DB will not like it"); @@ -338,7 +198,7 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC boolean ret = false; Exception renameException = null; try { - ret = tmpFileName.renameTo(destination); + ret = getTmpFileName().renameTo(destination); } catch (Exception e) { ret = false; renameException = e; @@ -346,43 +206,34 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC if (!ret) { // Rename failed :-( LOGGER.warn( - "Could not rename '" + tmpFileName.getAbsolutePath() + "' to '" + "Could not rename '" + getTmpFileName().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'", renameException); cancel(); - return; + return false; } // Now insert meta data into DB try { synchronized (versionWrittenToDb) { - DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath, - versionSettings, chunks, machineDescription); + DbImage.createImageVersion(image.imageBaseId, getId(), owner, getFileSize(), relPath, + versionSettings, getChunks(), machineDescription); versionWrittenToDb.set(true); } } catch (SQLException e) { LOGGER.error("Error finishing upload: Inserting version to DB failed", e); - state = TransferState.ERROR; // Also delete uploaded file, as there is no reference to it FileSystem.deleteAsync(destination); cancel(); - return; + return false; } + return true; } @Override public synchronized void cancel() { - if (state != TransferState.FINISHED && state != TransferState.ERROR) { - state = TransferState.ERROR; - } - synchronized (downloads) { - for (Downloader download : downloads) { - download.cancel(); - } - } - lastActivityTime.set(0); - Util.safeClose(tmpFileHandle); - if (!isRepairUpload() && tmpFileName.exists()) { - FileSystem.deleteAsync(tmpFileName); + super.cancel(); + if (!isRepairUpload() && getTmpFileName().exists()) { + FileSystem.deleteAsync(getTmpFileName()); } } @@ -399,203 +250,46 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC return this.owner; } - public File getDestinationFile() { - return this.tmpFileName; - } - - public long getSize() { - return this.fileSize; - } - - /** - * Callback class for an instance of the Downloader, which supplies - * the Downloader with wanted file ranges, and handles incoming data. - */ - private class CbHandler implements WantRangeCallback, DataReceivedCallback { - /** - * The current chunk being transfered. - */ - private FileChunk currentChunk = null; - /** - * Current buffer to receive to - */ - private byte[] buffer = new byte[FileChunk.CHUNK_SIZE]; - /** - * Downloader object - */ - private final Downloader downloader; - - private CbHandler(Downloader downloader) { - this.downloader = downloader; - } - - @Override - public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { - if (currentChunk == null) - throw new IllegalStateException("dataReceived without current chunk"); - if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength)) - throw new IllegalStateException("dataReceived with file data out of range"); - System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength); - return fileWritable; - } - - @Override - public FileRange get() { - if (currentChunk != null) { - if (hashChecker != null && currentChunk.getSha1Sum() != null) { - try { - hashChecker.queue(currentChunk, buffer, activeUpload); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - try { - buffer = new byte[buffer.length]; - } catch (OutOfMemoryError e) { - // Usually catching OOM errors is a bad idea, but it's quite safe here as - // we know exactly where it happened, no hidden sub-calls through 20 objects. - // The most likely cause here is that the hash checker/disk cannot keep up - // writing out completed chunks, so we just sleep a bit and try again. If it still - // fails, we exit completely. - try { - Thread.sleep(6000); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - return null; - } - // Might raise OOM again, but THIS TIME I MEAN IT - try { - buffer = new byte[buffer.length]; - } catch (OutOfMemoryError e2) { - downloader.sendErrorCode("Out of RAM"); - cancel(); - } - } - } else { - // We have no hash checker or the hash for the current chunk is unknown - flush to disk - writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer); - chunks.markSuccessful(currentChunk); - } - } - // Get next missing chunk - try { - currentChunk = chunks.getMissing(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - cancel(); - return null; - } - if (currentChunk == null) { - return null; // No more chunks, returning null tells the Downloader we're done. - } - // Check remaining disk space and abort if it's too low - long space = FileSystem.getAvailableStorageBytes(); - if (space != -1 && space < MIN_FREE_SPACE_BYTES) { - downloader.sendErrorCode("Out of disk space"); - LOGGER.error("Out of space: Cancelling upload of " - + (image == null ? "image" : image.imageName) + " by " - + Formatter.userFullName(owner)); - cancel(); - return null; - } - return currentChunk.range; - } - } - public synchronized TransferStatus getStatus() { - return new TransferStatus(chunks.getStatusArray(), state); + return new TransferStatus(getChunks().getStatusArray(), getState()); } @Override - public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { - if (state != TransferState.IDLE && state != TransferState.WORKING) - return; - switch (result) { - case FAILURE: - LOGGER.warn("Hash check of chunk " + chunk.toString() - + " could not be executed. Assuming valid :-("); - // Fall through - case VALID: - if (!chunk.isWrittenToDisk()) { - writeFileData(chunk.range.startOffset, chunk.range.getLength(), data); - } - chunks.markSuccessful(chunk); - if (chunks.isComplete()) { - finishUpload(); - } - break; - case INVALID: - LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch " - + chunk.getFailCount() + "x :-("); - chunks.markFailed(chunk); - break; - } + public boolean isActive() { + return getState() == TransferState.IDLE || getState() == TransferState.WORKING; } - private byte[] loadChunkFromFile(FileChunk chunk) { - synchronized (tmpFileHandle) { - if (state != TransferState.IDLE && state != TransferState.WORKING) - return null; - try { - tmpFileHandle.seek(chunk.range.startOffset); - byte[] buffer = new byte[chunk.range.getLength()]; - tmpFileHandle.readFully(buffer); - return buffer; - } catch (IOException e) { - LOGGER.error( - "Could not read chunk " + chunk.getChunkIndex() + " of File " - + tmpFileName.toString(), e); - return null; - } + @Override + protected void finalize() { + try { + super.finalize(); + } catch (Throwable t) { + } + try { + cancel(); + } catch (Throwable t) { } } - public void updateBlockHashList(List<byte[]> hashList) { - if (state != TransferState.IDLE && state != TransferState.WORKING) { - LOGGER.debug(this.getId() + ": Rejecting block hash list in state " + state); - return; - } - if (hashChecker == null || hashList == null) { - LOGGER.debug(this.getId() + ": Rejecting block hash list: No hasher"); - return; - } - chunks.updateSha1Sums(hashList); - FileChunk chunk; - while (null != (chunk = chunks.getUnhashedComplete())) { - byte[] data = loadChunkFromFile(chunk); - if (data == null) { - LOGGER.warn("Will mark unloadable chunk as valid :-("); - chunks.markSuccessful(chunk); - continue; - } - try { - hashChecker.queue(chunk, data, this); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } + @Override + protected boolean hasEnoughFreeSpace() { + return FileSystem.getAvailableStorageBytes() > MIN_FREE_SPACE_BYTES; } @Override - public boolean isActive() { - return state == TransferState.IDLE || state == TransferState.WORKING; + public TransferInformation getTransferInfo() { + return new TransferInformation(getId(), FileServer.instance().getPlainPort(), FileServer.instance() + .getSslPort()); } @Override - public int getActiveConnectionCount() { - return downloads.size(); + public String getRelativePath() { + return FileSystem.getRelativePath(getTmpFileName(), Configuration.getVmStoreBasePath()); } @Override - protected void finalize() { - try { - Util.safeClose(tmpFileHandle); - if (tmpFileName.exists()) { - FileSystem.deleteAsync(tmpFileName); - } - } catch (Throwable t) { - } + protected void chunkStatusChanged(FileChunk chunk) { + // TODO Update in DB in case this is a repair upload } } |