summaryrefslogblamecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
blob: 8943524ddfc81064d5e8a99a383331f01a1688fd (plain) (tree)















































































































































































































                                                                                                                                   
package org.openslx.filetransfer.util;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.LocalChunkSource.SourceFile;
import org.openslx.util.Util;

public class LocalCopyManager extends Thread
{

	private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class );

	private FileChunk currentChunk = null;

	private final ChunkList chunkList;

	private final IncomingTransferBase transfer;

	private final Map<String, RandomAccessFile> sources = new HashMap<>();

	private Semaphore hasWork = new Semaphore( 0 );

	private AtomicInteger copyCount = new AtomicInteger();

	private boolean paused = true;

	public LocalCopyManager( IncomingTransferBase transfer, ChunkList list )
	{
		super( "LocalCopyManager" );
		this.transfer = transfer;
		this.chunkList = list;
	}

	/**
	 * Trigger copying of another block if possible
	 */
	public synchronized void trigger()
	{
		if ( this.paused )
			return;
		if ( !isAlive() ) {
			LOGGER.warn( "Cannot be triggered when Thread is not running." );
			if ( currentChunk != null ) {
				chunkList.markFailed( currentChunk );
				currentChunk = null;
			}
			return;
		}
		if ( currentChunk == null ) {
			currentChunk = chunkList.getCopyCandidate();
			hasWork.release();
		}
	}

	@Override
	public void run()
	{
		try {
			while ( !interrupted() ) {
				while ( currentChunk != null ) {
					hasWork.drainPermits();
					copyChunk();
				}
				if ( !hasWork.tryAcquire( 10, TimeUnit.SECONDS ) ) {
					if ( chunkList.isComplete() ) {
						transfer.finishUploadInternal();
						break;
					} else if ( !transfer.isActive() ) {
						break;
					} else {
						trigger();
					}
				}
			}
		} catch ( InterruptedException | IllegalStateException e ) {
			interrupt();
		}
		synchronized ( this ) {
			if ( currentChunk != null ) {
				LOGGER.warn( "Still had a chunk when thread was interrupted." );
				chunkList.markFailed( currentChunk );
				currentChunk = null;
			}
		}
		for ( RandomAccessFile file : sources.values() ) {
			Util.safeClose( file );
		}
		LOGGER.debug( "My work here is done. Copied " + copyCount.get() + " chunks from " + sources.size() + " files." );
	}

	private void copyChunk() throws InterruptedException
	{
		ChunkSource source = currentChunk.getSources();
		if ( source != null ) {
			// OK
			for ( ;; ) {
				// Try every possible source file
				SourceFile sourceFile = getOpenFile( source, currentChunk.range.getLength() );
				if ( sourceFile == null ) {
					// Was marked as having a source file, but now we got null -- most likely
					// the source file doesn't exist or isn't readable
					LOGGER.warn( "No open file for local copying!" );
					break;
				}
				// OK
				RandomAccessFile raf = sources.get( sourceFile.fileName );
				byte[] buffer;
				try {
					raf.seek( sourceFile.offset );
					// In order not to hinder (fast) upload of unknown blocks, throttle
					// local copying as long as chunks are missing - do before allocating buffer
					// so we don't hold allocated unused memory for no reason, but the seek has
					// been done so we know the file handle is not goofed up
					if ( chunkList.hasLocallyMissingChunk() ) {
						int delay;
						HashChecker hc = transfer.getHashChecker();
						if ( hc == null ) {
							delay = 50;
						} else {
							delay = ( hc.getQueueFill() * 500 ) / hc.getQueueCapacity();
						}
						Thread.sleep( delay );
					}
					buffer = new byte[ sourceFile.chunkSize ];
					raf.readFully( buffer );
				} catch ( InterruptedException e ) {
					throw e;
				} catch ( Exception e ) {
					LOGGER.warn( "Could not read chunk to replicate from " + sourceFile.fileName, e );
					buffer = null;
					if ( e instanceof IOException ) {
						// Mark file as messed up
						sources.put( sourceFile.fileName, null );
					}
				}
				if ( buffer != null ) {
					// All is well, read chunk locally, pass on
					transfer.chunkReceivedInternal( currentChunk, buffer );
					synchronized ( this ) {
						currentChunk = null;
					}
					copyCount.incrementAndGet();
					trigger();
					return;
				}
				// Reaching here means failure
				// We'll keep looping as long as there are source files available
			}
			// End of loop over source files
		}
		// FAILED
		LOGGER.info( "Local copying failed, queueing for normal upload..." );
		synchronized ( this ) {
			chunkList.markFailed( currentChunk );
			currentChunk = null;
		}
	}

	private SourceFile getOpenFile( ChunkSource source, int requiredSize )
	{
		for ( SourceFile candidate : source.sourceCandidates ) {
			if ( sources.get( candidate.fileName ) != null )
				return candidate;
		}
		// Have to open
		for ( SourceFile candidate : source.sourceCandidates ) {
			if ( sources.containsKey( candidate.fileName ) ) // Maps to null (otherwise upper loop would have returned)
				continue; // File is broken, don't use
			if ( candidate.chunkSize != requiredSize )
				continue;
			File f = new File( candidate.fileName );
			if ( !f.exists() ) {
				sources.put( candidate.fileName, null ); // Mark for future
				continue;
			}
			try {
				RandomAccessFile raf = new RandomAccessFile( f, "r" );
				sources.put( candidate.fileName, raf );
				return candidate;
			} catch ( Exception e ) {
				LOGGER.info( "Cannot open " + candidate.fileName, e );
				sources.put( candidate.fileName, null ); // Mark for future
			}
		}
		// Nothing worked
		return null;
	}

	public boolean isPaused()
	{
		return paused;
	}

	public void setPaused( boolean paused )
	{
		this.paused = paused;
	}

}