summaryrefslogblamecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
blob: ae9ce3b47a87b2777cd436e45e1e10aabf5e1ee1 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                      
 
                    


                                     
                                              
                             
                           
                      
                                               
                                                 
 

                                
                               
                                                     
                                              
                                               
                                           

                                            
                                      
                                                      
                                                      
                                                       
                                                         

                                                    
                                              



                                                     

                                               


                                                                   

                                                                 
 
                                                                                         
 
                                                                                          

           

                                            
                                                               

           
                                                                       
           
                                                               
 
                                       
 
                                                     
 
                                       
 
                                    
 




                                          




                                                       










                                                                   









                                                                               


                                                                                  
                                            
 




                                                             

                                                     

                                                                             


                               
                                                                                     




                                                      
 


                                                                                                              


                                                                                 
                                                                                             
                                   
                                   
                                         
                                                             

























                                                                                                                 
                                                                                                       










































                                                                                                                                                                                              




                                                

                      

                      










                                                                              









                                                                            
                                                                                                         
                                                                                    
                                     
                                          
                                                                                       


                                                  



                                                     
                                                                                  
                                                                             




                                                                                                                               

                                                                                                                        


                                                                                                              

                                                                             


                                                                  
                                         
                                 
                           

                                                                                         


                                                             

                                     


                                                      



                            

                                                                   





                            
                                                                                  
                                              

                                                                                                            
                             

                                                                         
                                                 
                                                                              
                                                                                                                   
                                                     

                         


                                    

         



                                                  
                                              

                                                           
                                                      

                                                       




                                                                                                                         
                                                                            
                                                                                





                                                                                                                    




                                                                                                             
                                 
                               
                 


                                                                                               




                                                 
                                                                






                                            


                                                                                                                
                               


                                               
                     
                                                           
                                                                                                                


                                                                                             


                                                                                                  
                                                                                    

                                                            

                               

         
                 
                                           
                                                                                      




                                                                        



                                                               
                 
                                        



                                         

         
           







                                                                     
                                          
                                        






                                     






                                                                                    
                                                      


                                               
                                                                       







                                                          


                                                                                           





                                                                                                                           




                                                   
                                                                                               





                                                                                                      














                                                                                                                                  





                                                                                               
                                         
                                        
                                                                                                                              

                                                                            

                                                 



                                                                   
                                         

                                            
                                                   
                                                                                                               
                         





                                                                                           



                                                  
                                                        


                                                                          

                                                                                    

                                                                                  

                                 

                                                                                        



                                                                                              


                                                  

                              

                                                                                                             
                                                 
                              


                 
                                                           
                                              

                                                                                          
                             
                                                                            
                                                                                  
                                                                



                                                                                                             
                                                                                             





                                                                

                                                                                  
                                                            

                                                

                                                                        
                                                               




                                                                                       


                                                                     





                                                                   








                                                                                     
 









                                                      
 
package org.openslx.bwlp.sat.fileserv;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.SSLContext;

import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
import org.openslx.bwlp.sat.thrift.ThriftUtil;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.Util;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.ChunkList;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.filetransfer.util.HashChecker;
import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
import org.openslx.filetransfer.util.HashChecker.HashResult;
import org.openslx.util.vm.DiskImage;
import org.openslx.util.vm.DiskImage.UnknownImageFormatException;

public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback {

	private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class);

	/**
	 * Self reference for inner classes.
	 */
	private final IncomingDataTransfer activeUpload = this;

	/**
	 * Remote peer is uploading, so on our end, we have Downloaders
	 */
	private List<Downloader> downloads = new ArrayList<>();

	private final File tmpFileName;

	private final RandomAccessFile tmpFileHandle;

	private final ChunkList chunks;

	private final long fileSize;

	/**
	 * User owning this uploaded file.
	 */
	private final UserInfo owner;

	/**
	 * Base image this upload is a new version for.
	 */
	private final ImageDetailsRead image;

	/**
	 * Flags to set for this new image version. Optional field.
	 */
	private ImageVersionWrite versionSettings = null;

	/**
	 * TransferState of this upload
	 */
	private TransferState state = TransferState.IDLE;

	/**
	 * Description of this VM - binary dump of e.g. the *.vmx file (VMware)
	 */
	private final byte[] machineDescription;

	/**
	 * Indicated whether the version information was written to db already.
	 * Disallow setVersionData in that case.
	 */
	private final AtomicBoolean versionWrittenToDb = new AtomicBoolean();

	/**
	 * Whether file is (still) writable. Used for the file transfer callbacks.
	 */
	private boolean fileWritable = true;

	/**
	 * Set if this is a download from the master server
	 */
	private final TransferInformation masterTransferInfo;

	private static final HashChecker hashChecker;

	private static final long MIN_FREE_SPACE_BYTES = 64l * 1024l * 1024l;

	static {
		HashChecker hc;
		try {
			hc = new HashChecker("SHA-1", Constants.HASHCHECK_QUEUE_LEN);
		} catch (NoSuchAlgorithmException e) {
			hc = null;
		}
		hashChecker = hc;
	}

	public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image,
			File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription)
			throws FileNotFoundException {
		super(uploadId);
		this.tmpFileName = destinationFile;
		this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw");
		this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums);
		this.owner = owner;
		this.image = image;
		this.fileSize = fileSize;
		this.machineDescription = machineDescription;
		this.masterTransferInfo = null;
	}

	public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo)
			throws FileNotFoundException {
		super(publishData.imageVersionId);
		ImageDetailsRead idr = new ImageDetailsRead();
		idr.setCreateTime(publishData.createTime);
		idr.setDescription(publishData.description);
		idr.setImageBaseId(publishData.imageBaseId);
		idr.setImageName(publishData.imageName);
		idr.setIsTemplate(publishData.isTemplate);
		idr.setLatestVersionId(publishData.imageVersionId);
		idr.setOsId(publishData.osId);
		idr.setOwnerId(publishData.user.userId);
		idr.setTags(publishData.tags);
		idr.setUpdaterId(publishData.user.userId);
		idr.setUpdateTime(publishData.createTime);
		idr.setVirtId(publishData.virtId);
		this.tmpFileName = tmpFile;
		this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw");
		this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null
				: ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
		this.owner = publishData.user;
		this.image = idr;
		this.fileSize = publishData.fileSize;
		this.machineDescription = ThriftUtil.unwrapByteBuffer(transferInfo.machineDescription);
		this.masterTransferInfo = transferInfo;
		this.versionSettings = new ImageVersionWrite(false);
	}

	/**
	 * Called periodically if this is a transfer from the master server, so we
	 * can make sure the transfer is running.
	 */
	public void heartBeat(ThreadPoolExecutor pool) {
		if (masterTransferInfo == null)
			return;
		synchronized (this) {
			synchronized (downloads) {
				if (downloads.size() >= 1) // TODO What to pick here?
					return;
			}
			Downloader downloader = null;
			if (masterTransferInfo.plainPort != 0) {
				try {
					downloader = new Downloader(Configuration.getMasterServerAddress(),
							masterTransferInfo.plainPort, Constants.TRANSFER_TIMEOUT, null,
							masterTransferInfo.token);
				} catch (Exception e1) {
					LOGGER.debug("Plain connect failed", e1);
					downloader = null;
				}
			}
			if (downloader == null && masterTransferInfo.sslPort != 0) {
				try {
					downloader = new Downloader(Configuration.getMasterServerAddress(),
							masterTransferInfo.sslPort, Constants.TRANSFER_TIMEOUT, SSLContext.getDefault(), // TODO: Use the TLSv1.2 one once the master is ready
							masterTransferInfo.token);
				} catch (Exception e2) {
					LOGGER.debug("SSL connect failed", e2);
					downloader = null;
				}
			}
			if (downloader == null) {
				LOGGER.warn("Could not connect to master server for downloading " + image.imageName);
				return;
			}
			addConnection(downloader, pool);
		}
	}

	/**
	 * Set meta data for this image version.
	 * 
	 * @param user
	 * 
	 * @param data
	 */
	public boolean setVersionData(UserInfo user, ImageVersionWrite data) {
		synchronized (versionWrittenToDb) {
			if (versionWrittenToDb.get()) {
				return false;
			}
			if (!user.userId.equals(owner.userId)) {
				return false;
			}
			versionSettings = data;
			return true;
		}
	}

	/**
	 * Add another connection for this file transfer. Currently only one
	 * connection is allowed, but this might change in the future.
	 * 
	 * @param connection
	 * @return true if the connection is accepted, false if it should be
	 *         discarded
	 */
	public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) {
		if (state == TransferState.FINISHED || state == TransferState.ERROR)
			return false;
		synchronized (downloads) {
			if (downloads.size() >= Constants.MAX_CONNECTIONS_PER_TRANSFER)
				return false;
			downloads.add(connection);
		}
		try {
			pool.execute(new Runnable() {
				@Override
				public void run() {
					CbHandler cbh = new CbHandler(connection);
					if (!connection.download(cbh, cbh)) {
						if (cbh.currentChunk != null) {
							// If the download failed and we have a current chunk, put it back into
							// the queue, so it will be handled again later...
							chunks.markFailed(cbh.currentChunk);
						}
						LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed");
					}
					if (state != TransferState.FINISHED && state != TransferState.ERROR) {
						lastActivityTime.set(System.currentTimeMillis());
					}
					synchronized (downloads) {
						downloads.remove(connection);
					}
					if (chunks.isComplete()) {
						finishUpload();
					}
				}
			});
		} catch (Exception e) {
			LOGGER.warn("threadpool rejected the incoming file transfer", e);
			synchronized (downloads) {
				downloads.remove(connection);
			}
			return false;
		}
		if (state == TransferState.IDLE) {
			state = TransferState.WORKING;
		}
		return true;
	}

	/**
	 * Write some data to the local file. Thread safe so we can
	 * have multiple concurrent connections.
	 * 
	 * @param fileOffset
	 * @param dataLength
	 * @param data
	 * @return
	 */
	private void writeFileData(long fileOffset, int dataLength, byte[] data) {
		synchronized (tmpFileHandle) {
			if (state != TransferState.WORKING)
				throw new IllegalStateException("Cannot write to file if state != WORKING");
			try {
				tmpFileHandle.seek(fileOffset);
				tmpFileHandle.write(data, 0, dataLength);
			} catch (IOException e) {
				LOGGER.error("Cannot write to '" + tmpFileName
						+ "'. Disk full, network storage error, bad permissions, ...?", e);
				fileWritable = false;
			}
		}
		if (!fileWritable) {
			cancel();
		}
	}

	/**
	 * Called when the upload finished.
	 */
	private synchronized void finishUpload() {
		synchronized (tmpFileHandle) {
			if (state != TransferState.WORKING)
				return;
			Util.safeClose(tmpFileHandle);
			state = TransferState.FINISHED;
		}
		potentialFinishTime.set(System.currentTimeMillis());
		// If owner is not set, this was a repair-transfer, which downloads directly to the existing target file.
		// Nothing more to do in that case.
		if (isRepairUpload())
			return;
		LOGGER.info("Finalizing uploaded image " + image.imageName);
		// Ready to go. First step: Rename temp file to something usable
		String ext = "img";
		try {
			ext = new DiskImage(tmpFileName).format.extension;
		} catch (IOException | UnknownImageFormatException e1) {
		}
		File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName, ext));
		// Sanity check: destination should be a sub directory of the vmStorePath
		String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath());
		if (relPath == null) {
			LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of "
					+ Configuration.getVmStoreBasePath().getAbsolutePath());
			cancel();
			return;
		}
		if (relPath.length() > 200) {
			LOGGER.error("Generated file name is >200 chars. DB will not like it");
		}

		// Execute rename
		boolean ret = false;
		Exception renameException = null;
		try {
			ret = tmpFileName.renameTo(destination);
		} catch (Exception e) {
			ret = false;
			renameException = e;
		}
		if (!ret) {
			// Rename failed :-(
			LOGGER.warn(
					"Could not rename '" + tmpFileName.getAbsolutePath() + "' to '"
							+ destination.getAbsolutePath() + "'", renameException);
			cancel();
			return;
		}

		// Now insert meta data into DB
		try {
			synchronized (versionWrittenToDb) {
				DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath,
						versionSettings, chunks, machineDescription);
				versionWrittenToDb.set(true);
			}
		} catch (SQLException e) {
			LOGGER.error("Error finishing upload: Inserting version to DB failed", e);
			state = TransferState.ERROR;
			// Also delete uploaded file, as there is no reference to it
			FileSystem.deleteAsync(destination);
			cancel();
			return;
		}
	}

	@Override
	public synchronized void cancel() {
		if (state != TransferState.FINISHED && state != TransferState.ERROR) {
			state = TransferState.ERROR;
			if (!isRepairUpload() && tmpFileName.exists()) {
				FileSystem.deleteAsync(tmpFileName);
			}
		}
		synchronized (downloads) {
			for (Downloader download : downloads) {
				download.cancel();
			}
		}
		lastActivityTime.set(0);
	}

	public boolean isRepairUpload() {
		return owner == null;
	}

	/**
	 * Get user owning this upload. Can be null in special cases.
	 * 
	 * @return instance of UserInfo for the according user.
	 */
	public UserInfo getOwner() {
		return this.owner;
	}

	public File getDestinationFile() {
		return this.tmpFileName;
	}

	public long getSize() {
		return this.fileSize;
	}

	/**
	 * Callback class for an instance of the Downloader, which supplies
	 * the Downloader with wanted file ranges, and handles incoming data.
	 */
	private class CbHandler implements WantRangeCallback, DataReceivedCallback {
		/**
		 * The current chunk being transfered.
		 */
		private FileChunk currentChunk = null;
		/**
		 * Current buffer to receive to
		 */
		private byte[] buffer = new byte[FileChunk.CHUNK_SIZE];
		/**
		 * Downloader object
		 */
		private final Downloader downloader;

		private CbHandler(Downloader downloader) {
			this.downloader = downloader;
		}

		@Override
		public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
			if (currentChunk == null)
				throw new IllegalStateException("dataReceived without current chunk");
			if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength))
				throw new IllegalStateException("dataReceived with file data out of range");
			System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength);
			return fileWritable;
		}

		@Override
		public FileRange get() {
			if (currentChunk != null) {
				if (hashChecker != null && currentChunk.getSha1Sum() != null) {
					try {
						hashChecker.queue(currentChunk, buffer, activeUpload);
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
						return null;
					}
					try {
						buffer = new byte[buffer.length];
					} catch (OutOfMemoryError e) {
						// Usually catching OOM errors is a bad idea, but it's quite safe here as
						// we know exactly where it happened, no hidden sub-calls through 20 objects.
						// The most likely cause here is that the hash checker/disk cannot keep up
						// writing out completed chunks, so we just sleep a bit and try again. If it still
						// fails, we exit completely.
						try {
							Thread.sleep(6000);
						} catch (InterruptedException e1) {
							Thread.currentThread().interrupt();
							return null;
						}
						// Might raise OOM again, but THIS TIME I MEAN IT
						try {
							buffer = new byte[buffer.length];
						} catch (OutOfMemoryError e2) {
							downloader.sendErrorCode("Out of RAM");
							cancel();
						}
					}
				} else {
					writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer);
					chunks.markSuccessful(currentChunk);
				}
			}
			// Get next missing chunk
			try {
				currentChunk = chunks.getMissing();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				cancel();
				return null;
			}
			if (currentChunk == null) {
				return null; // No more chunks, returning null tells the Downloader we're done.
			}
			// Check remaining disk space and abort if it's too low
			if (FileSystem.getAvailableStorageBytes() < MIN_FREE_SPACE_BYTES) {
				downloader.sendErrorCode("Out of disk space");
				cancel();
				return null;
			}
			return currentChunk.range;
		}
	}

	public synchronized TransferStatus getStatus() {
		return new TransferStatus(chunks.getStatusArray(), state);
	}

	@Override
	public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) {
		if (state != TransferState.IDLE && state != TransferState.WORKING)
			return;
		switch (result) {
		case FAILURE:
			LOGGER.warn("Hash check of chunk " + chunk.toString()
					+ " could not be executed. Assuming valid :-(");
			// Fall through
		case VALID:
			writeFileData(chunk.range.startOffset, chunk.range.getLength(), data);
			chunks.markSuccessful(chunk);
			if (chunks.isComplete()) {
				finishUpload();
			}
			break;
		case INVALID:
			LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch "
					+ chunk.getFailCount() + "x :-(");
			chunks.markFailed(chunk);
			break;
		}
	}

	private byte[] loadChunkFromFile(FileChunk chunk) {
		synchronized (tmpFileHandle) {
			if (state != TransferState.IDLE && state != TransferState.WORKING)
				return null;
			try {
				tmpFileHandle.seek(chunk.range.startOffset);
				byte[] buffer = new byte[chunk.range.getLength()];
				tmpFileHandle.readFully(buffer);
				return buffer;
			} catch (IOException e) {
				LOGGER.error(
						"Could not read chunk " + chunk.getChunkIndex() + " of File "
								+ tmpFileName.toString(), e);
				return null;
			}
		}
	}

	public void updateBlockHashList(List<byte[]> hashList) {
		if (state != TransferState.IDLE && state != TransferState.WORKING)
			return;
		if (hashChecker == null || hashList == null)
			return;
		chunks.updateSha1Sums(hashList);
		FileChunk chunk;
		while (null != (chunk = chunks.getUnhashedComplete())) {
			byte[] data = loadChunkFromFile(chunk);
			if (data == null) {
				LOGGER.warn("Will mark unloadable chunk as valid :-(");
				chunks.markSuccessful(chunk);
				continue;
			}
			try {
				hashChecker.queue(chunk, data, this);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				return;
			}
		}
	}

	@Override
	public boolean isActive() {
		return state == TransferState.IDLE || state == TransferState.WORKING;
	}

	@Override
	public int getActiveConnectionCount() {
		return downloads.size();
	}

	@Override
	protected void finalize() {
		try {
			Util.safeClose(tmpFileHandle);
			if (tmpFileName.exists())
				tmpFileName.delete();
		} catch (Throwable t) {
		}
	}

}