summaryrefslogblamecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
blob: 0e901e52bc53b6e1379a43876abd0b8d4748b552 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11

                                      
                            







                                              
                             





                                                   
                                                    


                                                     

                                                             





































                                                                                                









                                                                          


                                                               



                                                                                                                       
                 

                                                                                             

                                         

                                                     
                 
                                                                










                                                                      
                                                                                                                                                      



                                                    
                                                         



                                                                          






                                                                                                          
                                                                






                                                                                          


















                                                                                        
                                             


                                                     













































                                                                                      


                                                                                 

         























                                                                                                       

                                                                                         

                                          
                                                   



                                                                       
                                           






                                                                                                            

                                                                                                  
                         
                                             
                                                                                                                    
                                                                    



                                                            
                                                                                                                                    
                                                                             
                                              
                                 
                                                            

                                                                                      



                                                                   








                                                                                         
                                      
















                                                                             

         
                                                                               








                                                                                            

                                                    












































                                                                                                                                               
                                                      
                                                     

                                                                                              
                                                                     
                                         



                                                                                                        

                                                      

















                                                                                                                                  
                                                                                                                                                    



                                                                                                 
                                 
                                                    


















                                                                                                                           


                                                              



                                                  



































                                                                                                                                                   


















                                                                                         



                                                                                



                                                                                                                               





                                                                                                                             

                                                                                               
                                                                                                                                    

                                                                                                                






                                                                                                   

                                                                                               
                                                                           


                                                                                   












































                                                                                                       
                                                                                                                                            
















                                                                                                                    

                                                                                            
                               
                 





                                                                                         


                                                                    






                                                                                                                
                         










                                                                                                              


                                                                            
                 
                                                                  
                                            


                                                   




                                                                         
                                                             














                                                                                                                       
                                                            



                                                    




                                                                               

                                                                     
                                                    

                                                                                         

                                                           

         
                                                      



                                                        

                                                                                              
                                                                    
                         
                                           

                                                                                
                                           


                                                     








                                                                   




                                              


















                                                                                

                                                                                     
                                                                                                  
           
                                                                       
         
                             

         











                                                                                
 
package org.openslx.filetransfer.util;

import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.LocalChunkSource;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
import org.openslx.filetransfer.util.HashChecker.HashResult;
import org.openslx.util.ThriftUtil;

public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback
{

	private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class );

	/**
	 * Remote peer is uploading, so on our end, we have Downloaders
	 */
	private List<Downloader> downloads = new ArrayList<>();

	private final File tmpFileName;

	private final RandomAccessFile tmpFileHandle;

	private final ChunkList chunks;

	private TransferState state = TransferState.IDLE;

	private final long fileSize;

	private static final HashChecker hashChecker;

	/*
	 * Overridable constants
	 */

	protected static int MAX_CONNECTIONS_PER_TRANSFER = 2;

	/**
	 * Whether file is (still) writable. Used for the file transfer callbacks.
	 */
	private boolean fileWritable = true;

	/**
	 * Called for getting local sources for certain chunks by checksum
	 */
	private final LocalChunkSource localChunkSource;
	
	/**
	 * Non-null if local copying is requested
	 */
	private final LocalCopyManager localCopyManager;

	static {
		long maxMem = Runtime.getRuntime().maxMemory();
		if ( maxMem == Long.MAX_VALUE ) {
			LOGGER.warn( "Cannot determine maximum JVM memory -- assuming 1GB -- this might not be safe" );
			maxMem = 1024;
		} else {
			maxMem /= ( 1024 * 1024 );
		}
		final int maxLen = Math.max( 6, Runtime.getRuntime().availableProcessors() );
		int hashQueueLen = (int) ( maxMem / 150 );
		if ( hashQueueLen < 1 ) {
			hashQueueLen = 1;
		} else if ( hashQueueLen > maxLen ) {
			hashQueueLen = maxLen;
		}
		LOGGER.debug( "Queue length: " + hashQueueLen );
		HashChecker hc;
		try {
			hc = new HashChecker( "SHA-1", hashQueueLen );
		} catch ( NoSuchAlgorithmException e ) {
			hc = null;
		}
		hashChecker = hc;
	}

	/*_*/

	public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes, LocalChunkSource localChunkSource )
			throws FileNotFoundException
	{
		super( transferId );
		this.fileSize = fileSize;
		this.localChunkSource = localChunkSource;
		// Prepare path
		tmpFileName = absFilePath;
		tmpFileName.getParentFile().mkdirs();
		tmpFileHandle = new RandomAccessFile( absFilePath, "rw" );
		try {
			if ( tmpFileHandle.length() > fileSize ) {
				tmpFileHandle.setLength( fileSize );
			}
		} catch ( IOException e ) {
			LOGGER.debug( "File " + tmpFileName + " is too long and could not be truncated" );
		}
		chunks = new ChunkList( fileSize, blockHashes );
		if ( this.localChunkSource != null ) {
			this.localCopyManager = new LocalCopyManager( this, this.chunks );
			this.localCopyManager.start();
			checkLocalCopyCandidates( blockHashes, 0 );
		} else {
			this.localCopyManager = null;
		}
	}

	@Override
	public boolean isActive()
	{
		return state == TransferState.IDLE || state == TransferState.WORKING;
	}

	@Override
	public synchronized void cancel()
	{
		if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
			state = TransferState.ERROR;
		}
		synchronized ( downloads ) {
			for ( Downloader download : downloads ) {
				download.cancel();
			}
		}
		potentialFinishTime.set( 0 );
		if ( localCopyManager != null ) {
			localCopyManager.interrupt();
		}
		safeClose( tmpFileHandle );
	}

	@Override
	public final int getActiveConnectionCount()
	{
		return downloads.size();
	}

	public final boolean hashesEqual( List<ByteBuffer> blockHashes )
	{
		List<FileChunk> existing = chunks.getAll();
		if ( existing.size() != blockHashes.size() )
			return false;
		List<byte[]> hashes = ThriftUtil.unwrapByteBufferList( blockHashes );
		FileChunk first = existing.get( 0 );
		if ( first == null || first.getSha1Sum() == null )
			return false;
		Iterator<byte[]> it = hashes.iterator();
		for ( FileChunk existingChunk : existing ) {
			byte[] testChunk = it.next();
			if ( !Arrays.equals( testChunk, existingChunk.getSha1Sum() ) )
				return false;
		}
		return true;
	}

	/*
	 * Guettas for final/private fields
	 */

	public final long getFileSize()
	{
		return fileSize;
	}

	public final File getTmpFileName()
	{
		return tmpFileName;
	}

	public final TransferState getState()
	{
		return state;
	}

	public synchronized TransferStatus getStatus()
	{
		return new TransferStatus( chunks.getStatusArray(), getState() );
	}

	public final ChunkList getChunks()
	{
		return chunks;
	}

	/**
	 * It is possible to run a download where the remote peer didn't submit
	 * the full list of block hashes yet, as it might be about to hash the file
	 * while uploading. This method should be called to update the list
	 * of block hashes. This is a cumulative call, so the list must contain
	 * all hashes starting from block 0.
	 * 
	 * @param hashList (incomplete) list of block hashes
	 */
	public void updateBlockHashList( List<byte[]> hashList )
	{
		if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
			LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state );
			return;
		}
		if ( hashList == null ) {
			LOGGER.debug( this.getId() + ": Rejecting null block hash list" );
			return;
		}
		int firstNew = chunks.updateSha1Sums( hashList );
		// No hash checker? Neither hashing nor server side dedup will make sense
		if ( hashChecker == null )
			return;
		// Check hashes of completed blocks
		for ( int cnt = 0; cnt < 3; ++cnt ) {
			FileChunk chunk = chunks.getUnhashedComplete();
			if ( chunk == null )
				break;
			byte[] data = null;
			try {
				data = loadChunkFromFile( chunk );
			} catch ( EOFException e1 ) {
				LOGGER.warn( "blockhash update: file too short, marking chunk as invalid" );
				chunks.markFailed( chunk );
				chunkStatusChanged( chunk );
				continue;
			} catch ( Exception e ) {
				LOGGER.warn( "unexpected fail while loading chunk from disk", e );
			}
			if ( data == null ) {
				LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" );
				chunks.markCompleted( chunk, true );
				chunkStatusChanged( chunk );
				continue;
			}
			try {
				if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop
					chunks.markCompleted( chunk, false );
					break;
				}
			} catch ( InterruptedException e ) {
				LOGGER.debug( "updateBlockHashList got interrupted" );
				chunks.markCompleted( chunk, false );
				Thread.currentThread().interrupt();
				return;
			}
		}
		// See if we have any candidates for local copy
		checkLocalCopyCandidates( hashList, firstNew );
	}

	private void checkLocalCopyCandidates( List<byte[]> hashList, int firstNew )
	{
		if ( localChunkSource == null || hashList == null || hashList.isEmpty() )
			return;
		List<byte[]> sums;
		if ( firstNew <= 0 ) {
			sums = hashList;
		} else {
			sums = hashList.subList( firstNew, hashList.size() );
		}
		if ( sums == null )
			return;
		sums = Collections.unmodifiableList( sums );
		List<ChunkSource> sources = null;
		try {
			sources = localChunkSource.getCloneSources( sums );
		} catch ( Exception e ) {
			LOGGER.warn( "Could not get chunk sources", e );
		}
		if ( sources != null && !sources.isEmpty() ) {
			chunks.markLocalCopyCandidates( sources );
		}
		localCopyManager.trigger();
	}

	private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException
	{
		synchronized ( tmpFileHandle ) {
			if ( state != TransferState.IDLE && state != TransferState.WORKING )
				return null;
			try {
				tmpFileHandle.seek( chunk.range.startOffset );
				byte[] buffer = new byte[ chunk.range.getLength() ];
				tmpFileHandle.readFully( buffer );
				return buffer;
			} catch ( EOFException e ) {
				throw e;
			} catch ( IOException e ) {
				LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e );
				return null;
			}
		}
	}

	/**
	 * Callback class for an instance of the Downloader, which supplies
	 * the Downloader with wanted file ranges, and handles incoming data.
	 */
	private class CbHandler implements WantRangeCallback, DataReceivedCallback
	{
		/**
		 * The current chunk being transfered.
		 */
		private FileChunk currentChunk = null;
		/**
		 * Current buffer to receive to
		 */
		private byte[] buffer = new byte[ FileChunk.CHUNK_SIZE ];
		/**
		 * Downloader object
		 */
		private final Downloader downloader;

		private CbHandler( Downloader downloader )
		{
			this.downloader = downloader;
		}

		@Override
		public boolean dataReceived( long fileOffset, int dataLength, byte[] data )
		{
			if ( currentChunk == null )
				throw new IllegalStateException( "dataReceived without current chunk" );
			if ( !currentChunk.range.contains( fileOffset, fileOffset + dataLength ) )
				throw new IllegalStateException( "dataReceived with file data out of range" );
			System.arraycopy( data, 0, buffer, (int) ( fileOffset - currentChunk.range.startOffset ), dataLength );
			return fileWritable;
		}

		@Override
		public FileRange get()
		{
			boolean needNewBuffer = false;
			if ( currentChunk != null ) {
				try {
					if ( chunkReceivedInternal( currentChunk, buffer ) ) {
						needNewBuffer = true;
					}
				} catch ( InterruptedException e3 ) {
					LOGGER.info( "Downloader was interrupted when trying to hash" );
					currentChunk = null;
					return null;
				}
				if ( needNewBuffer ) {
					try {
						buffer = new byte[ buffer.length ];
					} catch ( OutOfMemoryError e ) {
						// Usually catching OOM errors is a bad idea, but it's quite safe here as
						// we know exactly where it happened, no hidden sub-calls through 20 objects.
						// The most likely cause here is that the hash checker/disk cannot keep up
						// writing out completed chunks, so we just sleep a bit and try again. If it still
						// fails, we exit completely.
						try {
							Thread.sleep( 6000 );
						} catch ( InterruptedException e1 ) {
							Thread.currentThread().interrupt();
							return null;
						}
						// Might raise OOM again, but THIS TIME I MEAN IT
						try {
							buffer = new byte[ buffer.length ];
						} catch ( OutOfMemoryError e2 ) {
							LOGGER.warn( "Out of JVM memory - aborting incoming " + IncomingTransferBase.this.getId() );
							downloader.sendErrorCode( "Out of RAM" );
							cancel();
						}
					}
				}
				currentChunk = null;
			}
			// Get next missing chunk
			try {
				currentChunk = chunks.getMissing();
			} catch ( InterruptedException e ) {
				Thread.currentThread().interrupt();
				cancel();
				return null;
			}
			if ( currentChunk == null ) {
				return null; // No more chunks, returning null tells the Downloader we're done.
			}
			// Check remaining disk space and abort if it's too low
			if ( !hasEnoughFreeSpace() ) {
				downloader.sendErrorCode( "Out of disk space" );
				LOGGER.error( "Out of space: Cancelling upload of " + getTmpFileName().getAbsolutePath() );
				cancel();
				return null;
			}
			if ( state == TransferState.IDLE ) {
				state = TransferState.WORKING;
			}
			return currentChunk.range;
		}
	}

	/**
	 * 
	 * @param currentChunk
	 * @param buffer
	 * @return true if buffer is used internally and should not be modified in the future, false if
	 *         reuse is safe
	 * @throws InterruptedException
	 */
	final boolean chunkReceivedInternal( FileChunk currentChunk, byte[] buffer ) throws InterruptedException
	{
		boolean needNewBuffer = false;
		try {
			needNewBuffer = chunkReceived( currentChunk, buffer );
		} catch (Exception e) {
			LOGGER.warn( "Callback chunkReceived caused exception", e );
			needNewBuffer = true; // To be on the safe side
		}
		InterruptedException passEx = null;
		if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
			try {
				hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
				return true;
			} catch ( InterruptedException e ) {
				passEx = e;
			}
		}
		// We have no hash checker, or hasher rejected block,
		// or the hash for the current chunk is unknown - flush to disk
		writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
		chunks.markCompleted( currentChunk, false );
		chunkStatusChanged( currentChunk );
		if ( passEx != null )
			throw passEx;
		return needNewBuffer;
	}

	public boolean addConnection( final Downloader connection, ExecutorService pool )
	{
		if ( state == TransferState.FINISHED ) {
			handleIncomingWhenFinished( connection, pool );
			return true;
		}
		if ( state == TransferState.ERROR )
			return false;
		synchronized ( downloads ) {
			if ( downloads.size() >= MAX_CONNECTIONS_PER_TRANSFER )
				return false;
			downloads.add( connection );
		}
		try {
			pool.execute( new Runnable() {
				@Override
				public void run()
				{
					CbHandler cbh = new CbHandler( connection );
					if ( connection.download( cbh, cbh ) ) {
						connectFails.set( 0 );
					} else {
						connectFails.incrementAndGet();
						if ( cbh.currentChunk != null ) {
							// If the download failed and we have a current chunk, put it back into
							// the queue, so it will be handled again later...
							chunks.markFailed( cbh.currentChunk );
							// Possibly queue for local copy
							if ( localCopyManager != null && cbh.currentChunk.sha1sum != null ) {
								List<byte[]> lst = new ArrayList<>( 1 );
								lst.add( cbh.currentChunk.sha1sum );
								checkLocalCopyCandidates( lst, 0 );
							}
							chunkStatusChanged( cbh.currentChunk );
						}
						LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
					}
					if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
						lastActivityTime.set( System.currentTimeMillis() );
					}
					synchronized ( downloads ) {
						downloads.remove( connection );
					}
					if ( chunks.isComplete() ) {
						finishUploadInternal();
					} else {
						// Keep pumping unhashed chunks into the hasher
						queueUnhashedChunk( true );
						if ( localCopyManager != null ) {
							localCopyManager.trigger();
						}
					}
				}
			} );
		} catch ( Exception e ) {
			LOGGER.warn( "threadpool rejected the incoming file transfer", e );
			synchronized ( downloads ) {
				downloads.remove( connection );
			}
			return false;
		}
		if ( state == TransferState.IDLE ) {
			state = TransferState.WORKING;
		}
		return true;
	}

	private boolean handleIncomingWhenFinished( final Downloader connection, ExecutorService pool )
	{
		try {
			pool.execute( new Runnable() {
				@Override
				public void run()
				{
					connection.sendDoneAndClose();
				}
			} );
		} catch ( Exception e ) {
			return false;
		}
		return true;
	}

	/**
	 * Write some data to the local file. Thread safe so we can
	 * have multiple concurrent connections.
	 * 
	 * @param fileOffset
	 * @param dataLength
	 * @param data
	 * @return
	 */
	private void writeFileData( long fileOffset, int dataLength, byte[] data )
	{
		synchronized ( tmpFileHandle ) {
			if ( state != TransferState.WORKING )
				throw new IllegalStateException( "Cannot write to file if state != WORKING (is " + state.toString() + ")" );
			try {
				tmpFileHandle.seek( fileOffset );
				tmpFileHandle.write( data, 0, dataLength );
			} catch ( IOException e ) {
				LOGGER.error( "Cannot write to '" + getTmpFileName()
						+ "'. Disk full, network storage error, bad permissions, ...?", e );
				fileWritable = false;
			}
		}
		if ( !fileWritable ) {
			cancel();
		}
	}

	@Override
	public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk )
	{
		if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
			LOGGER.debug( "hashCheckDone called in bad state " + state.name() );
			return;
		}
		switch ( result ) {
		case FAILURE:
			LOGGER.warn( "Hash check of chunk " + chunk.toString()
					+ " could not be executed. Assuming valid :-(" );
			// Fall through
		case VALID:
			if ( chunk.isWrittenToDisk() ) {
				chunks.markCompleted( chunk, true );
			} else {
				try {
					writeFileData( chunk.range.startOffset, chunk.range.getLength(), data );
					chunks.markCompleted( chunk, true );
				} catch ( Exception e ) {
					LOGGER.warn( "Cannot write to file after hash check", e );
					chunks.markFailed( chunk );
				}
			}
			chunkStatusChanged( chunk );
			if ( chunks.isComplete() ) {
				finishUploadInternal();
			}
			break;
		case INVALID:
			LOGGER.warn( "Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch "
					+ chunk.getFailCount() + "x :-(" );
			chunks.markFailed( chunk );
			chunkStatusChanged( chunk );
			break;
		case NONE:
			LOGGER.warn( "Got hashCheckDone with result NONE" );
			break;
		}
		// A block finished, see if we can queue a new one
		queueUnhashedChunk( false );
		if ( localCopyManager != null ) {
			localCopyManager.trigger();
		}
	}

	/**
	 * Gets an unhashed chunk (if existent) and queues it for hashing
	 */
	protected void queueUnhashedChunk( boolean blocking )
	{
		FileChunk chunk = chunks.getUnhashedComplete();
		if ( chunk == null )
			return;
		byte[] data;
		try {
			data = loadChunkFromFile( chunk );
		} catch ( EOFException e1 ) {
			LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." );
			chunks.markFailed( chunk );
			chunkStatusChanged( chunk );
			return;
		}
		if ( data == null ) {
			LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" );
			chunks.markCompleted( chunk, true );
			chunkStatusChanged( chunk );
			return;
		}
		try {
			int flags = HashChecker.CALC_HASH;
			if ( blocking ) {
				flags |= HashChecker.BLOCKING;
			}
			if ( !hashChecker.queue( chunk, data, this, flags ) ) {
				chunks.markCompleted( chunk, false );
			}
		} catch ( InterruptedException e ) {
			LOGGER.debug( "Interrupted while trying to queueUnhashedChunk" );
			chunks.markCompleted( chunk, false );
			Thread.currentThread().interrupt();
		}
	}

	final synchronized void finishUploadInternal()
	{
		if ( state == TransferState.FINISHED ) {
			return;
		}
		try {
			if ( tmpFileHandle.length() < fileSize && chunks.lastChunkIsZero() ) {
				tmpFileHandle.setLength( fileSize );
			}
		} catch ( IOException e ) {
			LOGGER.warn( "Cannot extend file size to " + fileSize );
		}
		safeClose( tmpFileHandle );
		if ( localCopyManager != null ) {
			localCopyManager.interrupt();
		}
		if ( state != TransferState.WORKING ) {
			state = TransferState.ERROR;
		} else {
			state = TransferState.FINISHED; // Races...
			if ( !finishIncomingTransfer() ) {
				state = TransferState.ERROR;
			}
		}
	}
	
	protected HashChecker getHashChecker()
	{
		return hashChecker;
	}

	/*
	 * 
	 */

	/**
	 * Override this and return true if the destination of this download has
	 * still enough free space so we don't run into disk full errors.
	 */
	protected abstract boolean hasEnoughFreeSpace();

	/**
	 * This will be called once the download is complete.
	 * The file handle used for writing has been closed before calling this.
	 */
	protected abstract boolean finishIncomingTransfer();

	protected abstract void chunkStatusChanged( FileChunk chunk );

	/**
	 * Called when a chunk has been received -- no validation has taken place yet
	 * @return whether we want to use the buffered data later on and it must not be written to
	 */
	protected boolean chunkReceived( FileChunk chunk, byte[] data )
	{
		return false;
	}

	public boolean isServerSideCopyingEnabled()
	{
		return localCopyManager != null && !localCopyManager.isPaused();
	}

	public void enableServerSideCopying( boolean serverSideCopying )
	{
		if ( localCopyManager != null ) {
			localCopyManager.setPaused( !serverSideCopying );
		}
	}

}