package org.openslx.bwlp.sat.fileserv;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
import org.openslx.bwlp.sat.thrift.ThriftUtil;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.Util;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.ChunkList;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.filetransfer.util.HashChecker;
import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
import org.openslx.filetransfer.util.HashChecker.HashResult;
import org.openslx.util.vm.DiskImage;
import org.openslx.util.vm.DiskImage.UnknownImageFormatException;
public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback {
private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class);
/**
* Self reference for inner classes.
*/
private final IncomingDataTransfer activeUpload = this;
/**
* Remote peer is uploading, so on our end, we have Downloaders
*/
private List<Downloader> downloads = new ArrayList<>();
private final File tmpFileName;
private final RandomAccessFile tmpFileHandle;
private final ChunkList chunks;
private final long fileSize;
/**
* User owning this uploaded file.
*/
private final UserInfo owner;
/**
* Base image this upload is a new version for.
*/
private final ImageDetailsRead image;
/**
* Flags to set for this new image version. Optional field.
*/
private ImageVersionWrite versionSettings = null;
/**
* TransferState of this upload
*/
private TransferState state = TransferState.IDLE;
/**
* Description of this VM - binary dump of e.g. the *.vmx file (VMware)
*/
private final byte[] machineDescription;
/**
* Indicated whether the version information was written to db already.
* Disallow setVersionData in that case.
*/
private final AtomicBoolean versionWrittenToDb = new AtomicBoolean();
/**
* Whether file is (still) writable. Used for the file transfer callbacks.
*/
private boolean fileWritable = true;
/**
* Set if this is a download from the master server
*/
private final TransferInformation masterTransferInfo;
private static final HashChecker hashChecker;
private static final long MIN_FREE_SPACE_BYTES = 64l * 1024l * 1024l;
static {
HashChecker hc;
try {
hc = new HashChecker("SHA-1", Constants.HASHCHECK_QUEUE_LEN);
} catch (NoSuchAlgorithmException e) {
hc = null;
}
hashChecker = hc;
}
public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image,
File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription)
throws FileNotFoundException {
super(uploadId);
this.tmpFileName = destinationFile;
this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw");
this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums);
this.owner = owner;
this.image = image;
this.fileSize = fileSize;
this.machineDescription = machineDescription;
this.masterTransferInfo = null;
}
public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo)
throws FileNotFoundException {
super(publishData.imageVersionId);
ImageDetailsRead idr = new ImageDetailsRead();
idr.setCreateTime(publishData.createTime);
idr.setDescription(publishData.description);
idr.setImageBaseId(publishData.imageBaseId);
idr.setImageName(publishData.imageName);
idr.setIsTemplate(publishData.isTemplate);
idr.setLatestVersionId(publishData.imageVersionId);
idr.setOsId(publishData.osId);
idr.setOwnerId(publishData.user.userId);
idr.setTags(publishData.tags);
idr.setUpdaterId(publishData.user.userId);
idr.setUpdateTime(publishData.createTime);
idr.setVirtId(publishData.virtId);
this.tmpFileName = tmpFile;
this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw");
this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null
: ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
this.owner = publishData.user;
this.image = idr;
this.fileSize = publishData.fileSize;
this.machineDescription = ThriftUtil.unwrapByteBuffer(transferInfo.machineDescription);
this.masterTransferInfo = transferInfo;
this.versionSettings = new ImageVersionWrite(false);
}
/**
* Called periodically if this is a transfer from the master server, so we
* can make sure the transfer is running.
*/
public void heartBeat(ThreadPoolExecutor pool) {
if (masterTransferInfo == null)
return;
synchronized (this) {
synchronized (downloads) {
if (downloads.size() >= 1) // TODO What to pick here?
return;
}
Downloader downloader = null;
if (masterTransferInfo.plainPort != 0) {
try {
downloader = new Downloader(Configuration.getMasterServerAddress(),
masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null,
masterTransferInfo.token);
} catch (Exception e1) {
LOGGER.debug("Plain connect failed", e1);
downloader = null;
}
}
if (downloader == null && masterTransferInfo.sslPort != 0) {
try {
downloader = new Downloader(Configuration.getMasterServerAddress(),
masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready
masterTransferInfo.token);
} catch (Exception e2) {
LOGGER.debug("SSL connect failed", e2);
downloader = null;
}
}
if (downloader == null) {
LOGGER.warn("Could not connect to master server for downloading " + image.imageName);
return;
}
addConnection(downloader, pool);
}
}
/**
* Set meta data for this image version.
*
* @param user
*
* @param data
*/
public boolean setVersionData(UserInfo user, ImageVersionWrite data) {
synchronized (versionWrittenToDb) {
if (versionWrittenToDb.get()) {
return false;
}
if (!user.userId.equals(owner.userId)) {
return false;
}
versionSettings = data;
return true;
}
}
/**
* Add another connection for this file transfer. Currently only one
* connection is allowed, but this might change in the future.
*
* @param connection
* @return true if the connection is accepted, false if it should be
* discarded
*/
public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) {
if (state == TransferState.FINISHED || state == TransferState.ERROR)
return false;
synchronized (downloads) {
if (downloads.size() >= Constants.MAX_CONNECTIONS_PER_TRANSFER)
return false;
downloads.add(connection);
}
try {
pool.execute(new Runnable() {
@Override
public void run() {
CbHandler cbh = new CbHandler(connection);
if (!connection.download(cbh, cbh)) {
if (cbh.currentChunk != null) {
// If the download failed and we have a current chunk, put it back into
// the queue, so it will be handled again later...
chunks.markFailed(cbh.currentChunk);
}
LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed");
}
if (state != TransferState.FINISHED && state != TransferState.ERROR) {
lastActivityTime.set(System.currentTimeMillis());
}
synchronized (downloads) {
downloads.remove(connection);
}
if (chunks.isComplete()) {
finishUpload();
}
}
});
} catch (Exception e) {
LOGGER.warn("threadpool rejected the incoming file transfer", e);
synchronized (downloads) {
downloads.remove(connection);
}
return false;
}
if (state == TransferState.IDLE) {
state = TransferState.WORKING;
}
return true;
}
/**
* Write some data to the local file. Thread safe so we can
* have multiple concurrent connections.
*
* @param fileOffset
* @param dataLength
* @param data
* @return
*/
private void writeFileData(long fileOffset, int dataLength, byte[] data) {
synchronized (tmpFileHandle) {
if (state != TransferState.WORKING)
throw new IllegalStateException("Cannot write to file if state != WORKING");
try {
tmpFileHandle.seek(fileOffset);
tmpFileHandle.write(data, 0, dataLength);
} catch (IOException e) {
LOGGER.error("Cannot write to '" + tmpFileName
+ "'. Disk full, network storage error, bad permissions, ...?", e);
fileWritable = false;
}
}
if (!fileWritable) {
cancel();
}
}
/**
* Called when the upload finished.
*/
private synchronized void finishUpload() {
synchronized (tmpFileHandle) {
if (state != TransferState.WORKING)
return;
Util.safeClose(tmpFileHandle);
state = TransferState.FINISHED;
}
potentialFinishTime.set(System.currentTimeMillis());
// If owner is not set, this was a repair-transfer, which downloads directly to the existing target file.
// Nothing more to do in that case.
if (isRepairUpload())
return;
LOGGER.info("Finalizing uploaded image " + image.imageName);
// Ready to go. First step: Rename temp file to something usable
String ext = "img";
try {
ext = new DiskImage(tmpFileName).format.extension;
} catch (IOException | UnknownImageFormatException e1) {
}
File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName, ext));
// Sanity check: destination should be a sub directory of the vmStorePath
String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath());
if (relPath == null) {
LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of "
+ Configuration.getVmStoreBasePath().getAbsolutePath());
cancel();
return;
}
if (relPath.length() > 200) {
LOGGER.error("Generated file name is >200 chars. DB will not like it");
}
// Execute rename
boolean ret = false;
Exception renameException = null;
try {
ret = tmpFileName.renameTo(destination);
} catch (Exception e) {
ret = false;
renameException = e;
}
if (!ret) {
// Rename failed :-(
LOGGER.warn(
"Could not rename '" + tmpFileName.getAbsolutePath() + "' to '"
+ destination.getAbsolutePath() + "'", renameException);
cancel();
return;
}
// Now insert meta data into DB
try {
synchronized (versionWrittenToDb) {
DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath,
versionSettings, chunks, machineDescription);
versionWrittenToDb.set(true);
}
} catch (SQLException e) {
LOGGER.error("Error finishing upload: Inserting version to DB failed", e);
state = TransferState.ERROR;
// Also delete uploaded file, as there is no reference to it
FileSystem.deleteAsync(destination);
cancel();
return;
}
}
@Override
public synchronized void cancel() {
if (state != TransferState.FINISHED && state != TransferState.ERROR) {
state = TransferState.ERROR;
if (!isRepairUpload() && tmpFileName.exists()) {
FileSystem.deleteAsync(tmpFileName);
}
}
synchronized (downloads) {
for (Downloader download : downloads) {
download.cancel();
}
}
lastActivityTime.set(0);
}
public boolean isRepairUpload() {
return owner == null;
}
/**
* Get user owning this upload. Can be null in special cases.
*
* @return instance of UserInfo for the according user.
*/
public UserInfo getOwner() {
return this.owner;
}
public File getDestinationFile() {
return this.tmpFileName;
}
public long getSize() {
return this.fileSize;
}
/**
* Callback class for an instance of the Downloader, which supplies
* the Downloader with wanted file ranges, and handles incoming data.
*/
private class CbHandler implements WantRangeCallback, DataReceivedCallback {
/**
* The current chunk being transfered.
*/
private FileChunk currentChunk = null;
/**
* Current buffer to receive to
*/
private byte[] buffer = new byte[FileChunk.CHUNK_SIZE];
/**
* Downloader object
*/
private final Downloader downloader;
private CbHandler(Downloader downloader) {
this.downloader = downloader;
}
@Override
public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
if (currentChunk == null)
throw new IllegalStateException("dataReceived without current chunk");
if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength))
throw new IllegalStateException("dataReceived with file data out of range");
System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength);
return fileWritable;
}
@Override
public FileRange get() {
if (currentChunk != null) {
if (hashChecker != null && currentChunk.getSha1Sum() != null) {
try {
hashChecker.queue(currentChunk, buffer, activeUpload);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
try {
buffer = new byte[buffer.length];
} catch (OutOfMemoryError e) {
// Usually catching OOM errors is a bad idea, but it's quite safe here as
// we know exactly where it happened, no hidden sub-calls through 20 objects.
// The most likely cause here is that the hash checker/disk cannot keep up
// writing out completed chunks, so we just sleep a bit and try again. If it still
// fails, we exit completely.
try {
Thread.sleep(6000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
return null;
}
// Might raise OOM again, but THIS TIME I MEAN IT
try {
buffer = new byte[buffer.length];
} catch (OutOfMemoryError e2) {
downloader.sendErrorCode("Out of RAM");
cancel();
}
}
} else {
writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer);
chunks.markSuccessful(currentChunk);
}
}
// Get next missing chunk
try {
currentChunk = chunks.getMissing();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
cancel();
return null;
}
if (currentChunk == null) {
return null; // No more chunks, returning null tells the Downloader we're done.
}
// Check remaining disk space and abort if it's too low
if (FileSystem.getAvailableStorageBytes() < MIN_FREE_SPACE_BYTES) {
downloader.sendErrorCode("Out of disk space");
cancel();
return null;
}
return currentChunk.range;
}
}
public synchronized TransferStatus getStatus() {
return new TransferStatus(chunks.getStatusArray(), state);
}
@Override
public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) {
if (state != TransferState.IDLE && state != TransferState.WORKING)
return;
switch (result) {
case FAILURE:
LOGGER.warn("Hash check of chunk " + chunk.toString()
+ " could not be executed. Assuming valid :-(");
// Fall through
case VALID:
writeFileData(chunk.range.startOffset, chunk.range.getLength(), data);
chunks.markSuccessful(chunk);
if (chunks.isComplete()) {
finishUpload();
}
break;
case INVALID:
LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch "
+ chunk.getFailCount() + "x :-(");
chunks.markFailed(chunk);
break;
}
}
private byte[] loadChunkFromFile(FileChunk chunk) {
synchronized (tmpFileHandle) {
if (state != TransferState.IDLE && state != TransferState.WORKING)
return null;
try {
tmpFileHandle.seek(chunk.range.startOffset);
byte[] buffer = new byte[chunk.range.getLength()];
tmpFileHandle.readFully(buffer);
return buffer;
} catch (IOException e) {
LOGGER.error(
"Could not read chunk " + chunk.getChunkIndex() + " of File "
+ tmpFileName.toString(), e);
return null;
}
}
}
public void updateBlockHashList(List<byte[]> hashList) {
if (state != TransferState.IDLE && state != TransferState.WORKING)
return;
if (hashChecker == null || hashList == null)
return;
chunks.updateSha1Sums(hashList);
FileChunk chunk;
while (null != (chunk = chunks.getUnhashedComplete())) {
byte[] data = loadChunkFromFile(chunk);
if (data == null) {
LOGGER.warn("Will mark unloadable chunk as valid :-(");
chunks.markSuccessful(chunk);
continue;
}
try {
hashChecker.queue(chunk, data, this);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
@Override
public boolean isActive() {
return state == TransferState.IDLE || state == TransferState.WORKING;
}
@Override
public int getActiveConnectionCount() {
return downloads.size();
}
@Override
protected void finalize() {
try {
Util.safeClose(tmpFileHandle);
if (tmpFileName.exists())
tmpFileName.delete();
} catch (Throwable t) {
}
}
}