package org.openslx.filetransfer.util;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.LocalChunkSource.SourceFile;
import org.openslx.util.Util;
public class LocalCopyManager extends Thread
{
private static final Logger LOGGER = LogManager.getLogger( LocalCopyManager.class );
private FileChunk currentChunk = null;
private final ChunkList chunkList;
private final IncomingTransferBase transfer;
private final Map<String, RandomAccessFile> sources = new HashMap<>();
private Semaphore hasWork = new Semaphore( 0 );
private AtomicInteger copyCount = new AtomicInteger();
private boolean paused = true;
public LocalCopyManager( IncomingTransferBase transfer, ChunkList list )
{
super( "LocalCopyManager" );
this.transfer = transfer;
this.chunkList = list;
}
/**
* Trigger copying of another block if possible
*/
public synchronized void trigger()
{
if ( this.paused )
return;
if ( this.getState() == State.NEW ) {
start();
}
triggerInternal();
}
private synchronized void triggerInternal()
{
if ( this.paused )
return;
if ( !isAlive() ) {
LOGGER.warn( "Cannot be triggered when Thread is not running." );
if ( currentChunk != null ) {
chunkList.markFailed( currentChunk );
currentChunk = null;
}
return;
}
if ( currentChunk == null ) {
currentChunk = chunkList.getCopyCandidate();
hasWork.release();
}
}
@Override
public void run()
{
try {
while ( !interrupted() ) {
while ( currentChunk != null ) {
hasWork.drainPermits();
copyChunk();
}
if ( !hasWork.tryAcquire( 10, TimeUnit.SECONDS ) ) {
if ( chunkList.isComplete() ) {
transfer.finishUploadInternal();
break;
} else if ( !transfer.isActive() ) {
break;
} else {
triggerInternal();
}
}
}
} catch ( InterruptedException | IllegalStateException e ) {
interrupt();
}
synchronized ( this ) {
if ( currentChunk != null ) {
LOGGER.warn( "Still had a chunk when thread was interrupted." );
chunkList.markFailed( currentChunk );
currentChunk = null;
}
}
for ( RandomAccessFile file : sources.values() ) {
Util.safeClose( file );
}
LOGGER.debug( "My work here is done. Copied " + copyCount.get() + " chunks from " + sources.size() + " files." );
}
private void copyChunk() throws InterruptedException
{
ChunkSource source = currentChunk.getSources();
if ( source != null ) {
// OK
for ( ;; ) {
// Try every possible source file
SourceFile sourceFile = getOpenFile( source, currentChunk.range.getLength() );
if ( sourceFile == null ) {
// Was marked as having a source file, but now we got null -- most likely
// the source file doesn't exist or isn't readable
LOGGER.warn( "No open file for local copying!" );
break;
}
// OK
RandomAccessFile raf = sources.get( sourceFile.fileName );
byte[] buffer;
try {
raf.seek( sourceFile.offset );
// In order not to hinder (fast) upload of unknown blocks, throttle
// local copying as long as chunks are missing - do before allocating buffer
// so we don't hold allocated unused memory for no reason, but the seek has
// been done so we know the file handle is not goofed up
if ( chunkList.hasLocallyMissingChunk() ) {
int delay;
HashChecker hc = transfer.getHashChecker();
if ( hc == null ) {
delay = 50;
} else {
delay = ( hc.getQueueFill() * 500 ) / hc.getQueueCapacity();
}
Thread.sleep( delay );
}
buffer = new byte[ sourceFile.chunkSize ];
raf.readFully( buffer );
} catch ( InterruptedException e ) {
throw e;
} catch ( Exception e ) {
LOGGER.warn( "Could not read chunk to replicate from " + sourceFile.fileName, e );
buffer = null;
if ( e instanceof IOException ) {
// Mark file as messed up
sources.put( sourceFile.fileName, null );
}
}
if ( buffer != null ) {
// All is well, read chunk locally, pass on
transfer.chunkReceivedInternal( currentChunk, buffer );
synchronized ( this ) {
currentChunk = null;
}
copyCount.incrementAndGet();
triggerInternal();
return;
}
// Reaching here means failure
// We'll keep looping as long as there are source files available
}
// End of loop over source files
}
// FAILED
LOGGER.info( "Local copying failed, queueing for normal upload..." );
synchronized ( this ) {
chunkList.markFailed( currentChunk );
currentChunk = null;
}
}
private SourceFile getOpenFile( ChunkSource source, int requiredSize )
{
for ( SourceFile candidate : source.sourceCandidates ) {
if ( sources.get( candidate.fileName ) != null )
return candidate;
}
// Have to open
for ( SourceFile candidate : source.sourceCandidates ) {
if ( sources.containsKey( candidate.fileName ) ) // Maps to null (otherwise upper loop would have returned)
continue; // File is broken, don't use
if ( candidate.chunkSize != requiredSize )
continue;
File f = new File( candidate.fileName );
if ( !f.exists() ) {
sources.put( candidate.fileName, null ); // Mark for future
continue;
}
try {
RandomAccessFile raf = new RandomAccessFile( f, "r" );
sources.put( candidate.fileName, raf );
return candidate;
} catch ( Exception e ) {
LOGGER.info( "Cannot open " + candidate.fileName, e );
sources.put( candidate.fileName, null ); // Mark for future
}
}
// Nothing worked
return null;
}
public boolean isPaused()
{
return paused;
}
public void setPaused( boolean paused )
{
this.paused = paused;
}
}