package org.openslx.bwlp.sat.fileserv; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.sql.SQLException; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.filetransfer.util.ChunkList; import org.openslx.filetransfer.util.FileChunk; public class ActiveUpload { private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); /** * This is an active upload, so on our end, we have a Downloader. */ private Downloader download = null; private final File destinationFile; private final RandomAccessFile outFile; private final ChunkList chunks; private final long fileSize; /** * User owning this uploaded file. */ private final UserInfo owner; /** * Base image this upload is a new version for. */ private final ImageDetailsRead image; /** * Flags to set for this new image version. Optional field. */ private ImageVersionWrite versionSettings = null; /** * TransferState of this upload */ private TransferState state = TransferState.IDLE; /** * ID of this upload - will become this version id on success. */ private final String uploadId; /** * Description of this VM - binary dump of e.g. the *.vmx file (VMware) */ private final byte[] machineDescription; /** * Indicated whether the version information was written to db already. * Disallow setVersionData in that case. */ private final AtomicBoolean versionWrittenToDb = new AtomicBoolean(); // TODO: Use HashList for verification public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, long fileSize, List sha1Sums, byte[] machineDescription) throws FileNotFoundException { this.destinationFile = destinationFile; this.outFile = new RandomAccessFile(destinationFile, "rw"); this.chunks = new ChunkList(fileSize, sha1Sums); this.owner = owner; this.image = image; this.fileSize = fileSize; this.uploadId = uploadId; this.machineDescription = machineDescription; } /** * Set meta data for this image version. * * @param user * * @param data */ public boolean setVersionData(UserInfo user, ImageVersionWrite data) { synchronized (versionWrittenToDb) { if (versionWrittenToDb.get()) { return false; } if (!user.userId.equals(owner.userId)) { return false; } versionSettings = data; return true; } } /** * Add another connection for this file transfer. Currently only one * connection is allowed, but this might change in the future. * * @param connection * @return true if the connection is accepted, false if it should be * discarded */ public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) { if (download != null || chunks.isComplete() || state == TransferState.FINISHED || state == TransferState.ERROR) return false; download = connection; try { pool.execute(new Runnable() { @Override public void run() { CbHandler cbh = new CbHandler(); if (!download.download(cbh, cbh)) { if (cbh.currentChunk != null) { // If the download failed and we have a current chunk, put it back into // the queue, so it will be handled again later... chunks.markFailed(cbh.currentChunk); } LOGGER.warn("Download of " + destinationFile.getAbsolutePath()); } } }); state = TransferState.WORKING; } catch (Exception e) { LOGGER.warn("threadpool rejected the incoming file transfer", e); return false; } return true; } /** * Write some data to the local file. Thread safe so we could * have multiple concurrent connections later. * * @param fileOffset * @param dataLength * @param data * @return */ private boolean writeFileData(long fileOffset, int dataLength, byte[] data) { if (state != TransferState.WORKING) throw new IllegalStateException("Cannot write to file if state != RUNNING"); synchronized (outFile) { try { outFile.seek(fileOffset); outFile.write(data, 0, dataLength); } catch (IOException e) { LOGGER.error("Cannot write to '" + destinationFile + "'. Disk full, network storage error, bad permissions, ...?", e); return false; } } return true; } /** * Called when the upload finished. */ private synchronized void finishUpload() { synchronized (outFile) { if (state != TransferState.WORKING) return; Util.safeClose(outFile); state = TransferState.FINISHED; } File file = destinationFile; // Ready to go. First step: Rename temp file to something usable File destination = new File(file.getParent(), Formatter.vmName(owner, image.imageName)); // Sanity check: destination should be a sub directory of the vmStorePath String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath()); if (relPath == null) { LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " + Configuration.getVmStoreBasePath().getAbsolutePath()); state = TransferState.ERROR; return; } // Execute rename boolean ret = false; Exception renameException = null; try { ret = file.renameTo(destination); } catch (Exception e) { ret = false; renameException = e; } if (!ret) { // Rename failed :-( LOGGER.warn( "Could not rename '" + file.getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'", renameException); state = TransferState.ERROR; return; } // Now insert meta data into DB try { synchronized (versionWrittenToDb) { DbImage.createImageVersion(image.imageBaseId, uploadId, owner, fileSize, relPath, versionSettings, chunks, machineDescription); versionWrittenToDb.set(true); } } catch (SQLException e) { LOGGER.error("Error finishing upload: Inserting version to DB failed", e); state = TransferState.ERROR; return; } } public synchronized void cancel() { if (download != null) { download.cancel(); } if (state != TransferState.FINISHED) { state = TransferState.ERROR; } } /** * Get user owning this upload. Can be null in special cases. * * @return instance of UserInfo for the according user. */ public UserInfo getOwner() { return this.owner; } public synchronized boolean isComplete() { return state == TransferState.FINISHED; } public File getDestinationFile() { return this.destinationFile; } public long getSize() { return this.fileSize; } /** * Callback class for an instance of the Downloader, which supplies * the Downloader with wanted file ranges, and handles incoming data. */ private class CbHandler implements WantRangeCallback, DataReceivedCallback { /** * The current chunk being transfered. */ public FileChunk currentChunk = null; @Override public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { // TODO: Maybe cache in RAM and write full CHUNK_SIZE blocks at a time? // Would probably help with slower storage, especially if it's using // rotating disks and we're running multiple uploads. // Also we wouldn't have to re-read a block form disk for sha1 checking. return writeFileData(fileOffset, dataLength, data); } @Override public FileRange get() { if (currentChunk != null) { // TODO: A chunk was requested before, check hash and requeue if not matching // This needs to be async (own thread) so will be a little complicated // For now, we just mark it as complete chunks.markSuccessful(currentChunk); LOGGER.info("There was a previous chunk!"); } // Get next missing chunk currentChunk = chunks.getMissing(); LOGGER.info("Next missing chunk: " + currentChunk); if (currentChunk == null) { finishUpload(); return null; // No more chunks, returning null tells the Downloader we're done. } return currentChunk.range; } } public synchronized TransferStatus getStatus() { return new TransferStatus(chunks.getStatusArray(), state); } public String getId() { return uploadId; } // TODO: Clean up old stale uploads }