summaryrefslogblamecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
blob: 08d0d30fe420a73db1a51e1f7e9e7086dcca7f9d (plain) (tree)
1
2
3
4
5
6
7
8
9
                                      
 
                    


                                     
                           
                             
                      


                                               
                                                     


                                               
                                      
                                                      


                                                       
                                              



                                                     

                                               
 







                                                                                  
                                           


                                               
                                       
 
                                    
 




                                          




                                                       














                                                                      

                                              

                                                                                                          
                                                       


                                                                           
                                   
                                         









                                                                         










                                                                                                   

                                                                                              

                                      












                                                                                                                               
                                 





                                                                                         












                                                                                     

                                                                                                    












                                                                                                                   









                                                       







                                                                                                             

                                                    















                                                                                                                                

                                                    


                                               







                                                                                                         

         
                                           



                                                      
                                                    
                 

         
           







                                                                     

                                                       










                                            






















                                                                                                             

                                                                       
                                                                           

                                                 
                                                           


                                                                           
                                                                                                               
                         



                                                  



                                                                          

                                           
 
package org.openslx.bwlp.sat.fileserv;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.Util;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.ChunkList;
import org.openslx.filetransfer.util.FileChunk;

public class ActiveUpload {
	private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);

	/**
	 * This is an active upload, so on our end, we have a Downloader.
	 */
	private Downloader download = null;

	private final File destinationFile;

	private final RandomAccessFile outFile;

	private final ChunkList chunks;

	private final long fileSize;

	/**
	 * User owning this uploaded file.
	 */
	private final UserInfo owner;

	/**
	 * Base image this upload is a new version for.
	 */
	private final ImageDetailsRead image;

	/**
	 * Flags to set for this new image version. Optional field.
	 */
	private ImageVersionWrite versionSettings = null;

	/**
	 * TransferState of this upload
	 */
	private TransferState state = TransferState.IDLE;

	/**
	 * ID of this upload - will become this version id on success.
	 */
	private final String uploadId;

	// TODO: Use HashList for verification

	public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile,
			long fileSize, List<ByteBuffer> sha1Sums) throws FileNotFoundException {
		this.destinationFile = destinationFile;
		this.outFile = new RandomAccessFile(destinationFile, "rw");
		this.chunks = new ChunkList(fileSize, sha1Sums);
		this.owner = owner;
		this.image = image;
		this.fileSize = fileSize;
		this.uploadId = uploadId;
	}

	/**
	 * Set meta data for this image version.
	 * 
	 * @param data
	 */
	public synchronized void setVersionData(ImageVersionWrite data) {
		versionSettings = data;
	}

	/**
	 * Add another connection for this file transfer. Currently only one
	 * connection is allowed, but this might change in the future.
	 * 
	 * @param connection
	 * @return true if the connection is accepted, false if it should be
	 *         discarded
	 */
	public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) {
		if (download != null || chunks.isComplete() || state == TransferState.FINISHED
				|| state == TransferState.ERROR)
			return false;
		download = connection;
		try {
			pool.execute(new Runnable() {
				@Override
				public void run() {
					CbHandler cbh = new CbHandler();
					if (!download.download(cbh, cbh)) {
						if (cbh.currentChunk != null) {
							// If the download failed and we have a current chunk, put it back into
							// the queue, so it will be handled again later...
							chunks.markFailed(cbh.currentChunk);
						}
						LOGGER.warn("Download of " + destinationFile.getAbsolutePath());
					}
				}
			});
			state = TransferState.WORKING;
		} catch (Exception e) {
			LOGGER.warn("threadpool rejected the incoming file transfer", e);
			return false;
		}
		return true;
	}

	/**
	 * Write some data to the local file. Thread safe so we could
	 * have multiple concurrent connections later.
	 * 
	 * @param fileOffset
	 * @param dataLength
	 * @param data
	 * @return
	 */
	private boolean writeFileData(long fileOffset, int dataLength, byte[] data) {
		if (state != TransferState.WORKING)
			throw new IllegalStateException("Cannot write to file if state != RUNNING");
		synchronized (outFile) {
			try {
				outFile.seek(fileOffset);
				outFile.write(data, 0, dataLength);
			} catch (IOException e) {
				LOGGER.error("Cannot write to '" + destinationFile
						+ "'. Disk full, network storage error, bad permissions, ...?", e);
				return false;
			}
		}
		return true;
	}

	/**
	 * Called when the upload finished.
	 */
	private synchronized void finishUpload() {
		if (state != TransferState.WORKING)
			return;
		synchronized (outFile) {
			Util.safeClose(outFile);
			state = TransferState.FINISHED;
		}
		File file = destinationFile;
		// Ready to go. First step: Rename temp file to something usable
		File destination = new File(file.getParent(), Formatter.vmName(owner, image.imageName));
		// Sanity check: destination should be a sub directory of the vmStorePath
		String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath());
		if (relPath == null) {
			LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of "
					+ Configuration.getVmStoreBasePath().getAbsolutePath());
			state = TransferState.ERROR;
			return;
		}

		// Execute rename
		boolean ret = false;
		Exception renameException = null;
		try {
			ret = file.renameTo(destination);
		} catch (Exception e) {
			ret = false;
			renameException = e;
		}
		if (!ret) {
			// Rename failed :-(
			LOGGER.warn(
					"Could not rename '" + file.getAbsolutePath() + "' to '" + destination.getAbsolutePath()
							+ "'", renameException);
			state = TransferState.ERROR;
			return;
		}

		// Now insert meta data into DB
		try {
			DbImage.createImageVersion(image.imageBaseId, uploadId, owner, fileSize, relPath,
					versionSettings, chunks);
		} catch (SQLException e) {
			LOGGER.error("Error finishing upload: Inserting version to DB failed", e);
			state = TransferState.ERROR;
			return;
		}
	}

	public synchronized void cancel() {
		if (download != null) {
			download.cancel();
		}
		if (state != TransferState.FINISHED) {
			state = TransferState.ERROR;
		}
	}

	/**
	 * Get user owning this upload. Can be null in special cases.
	 * 
	 * @return instance of UserInfo for the according user.
	 */
	public UserInfo getOwner() {
		return this.owner;
	}

	public synchronized boolean isComplete() {
		return state == TransferState.FINISHED;
	}

	public File getDestinationFile() {
		return this.destinationFile;
	}

	public long getSize() {
		return this.fileSize;
	}

	/**
	 * Callback class for an instance of the Downloader, which supplies
	 * the Downloader with wanted file ranges, and handles incoming data.
	 */
	private class CbHandler implements WantRangeCallback, DataReceivedCallback {
		/**
		 * The current chunk being transfered.
		 */
		public FileChunk currentChunk = null;

		@Override
		public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
			// TODO: Maybe cache in RAM and write full CHUNK_SIZE blocks at a time?
			// Would probably help with slower storage, especially if it's using
			// rotating disks and we're running multiple uploads.
			// Also we wouldn't have to re-read a block form disk for sha1 checking.
			return writeFileData(fileOffset, dataLength, data);
		}

		@Override
		public FileRange get() {
			if (currentChunk != null) {
				// TODO: A chunk was requested before, check hash and requeue if not matching
				// This needs to be async (own thread) so will be a little complicated
				// For now, we just mark it as complete
				chunks.markSuccessful(currentChunk);
				LOGGER.info("There was a previous chunk!");
			}
			// Get next missing chunk
			currentChunk = chunks.getMissing();
			LOGGER.info("Next missing chunk: " + currentChunk);
			if (currentChunk == null) {
				finishUpload();
				return null; // No more chunks, returning null tells the Downloader we're done.
			}
			return currentChunk.range;
		}
	}

	public TransferStatus getStatus() {
		return new TransferStatus(chunks.getStatusArray(), state);
	}

	// TODO: Clean up old stale uploads

}