summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java160
1 files changed, 138 insertions, 22 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 4fe6d88..c2a9443 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -9,6 +9,7 @@ import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -19,6 +20,8 @@ import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.LocalChunkSource;
+import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
import org.openslx.filetransfer.util.HashChecker.HashResult;
@@ -57,17 +60,32 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
*/
private boolean fileWritable = true;
+ /**
+ * Called for getting local sources for certain chunks by checksum
+ */
+ private final LocalChunkSource localChunkSource;
+
+ /**
+ * Non-null if local copying is requested
+ */
+ private final LocalCopyManager localCopyManager;
+
static {
long maxMem = Runtime.getRuntime().maxMemory();
if ( maxMem == Long.MAX_VALUE ) {
- maxMem = 512;
+ LOGGER.warn( "Cannot determine maximum JVM memory -- assuming 1GB -- this might not be safe" );
+ maxMem = 1024;
+ } else {
+ maxMem /= ( 1024 * 1024 );
}
- int hashQueueLen = (int) ( maxMem / 100 );
+ final int maxLen = Math.max( 6, Runtime.getRuntime().availableProcessors() );
+ int hashQueueLen = (int) ( maxMem / 150 );
if ( hashQueueLen < 1 ) {
hashQueueLen = 1;
- } else if ( hashQueueLen > 6 ) {
- hashQueueLen = 6;
+ } else if ( hashQueueLen > maxLen ) {
+ hashQueueLen = maxLen;
}
+ LOGGER.debug( "Queue length: " + hashQueueLen );
HashChecker hc;
try {
hc = new HashChecker( "SHA-1", hashQueueLen );
@@ -79,11 +97,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
/*_*/
- public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes )
+ public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes, LocalChunkSource localChunkSource )
throws FileNotFoundException
{
super( transferId );
this.fileSize = fileSize;
+ this.localChunkSource = localChunkSource;
// Prepare path
tmpFileName = absFilePath;
tmpFileName.getParentFile().mkdirs();
@@ -96,6 +115,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
LOGGER.debug( "File " + tmpFileName + " is too long and could not be truncated" );
}
chunks = new ChunkList( fileSize, blockHashes );
+ if ( this.localChunkSource != null ) {
+ this.localCopyManager = new LocalCopyManager( this, this.chunks );
+ this.localCopyManager.start();
+ checkLocalCopyCandidates( blockHashes, 0 );
+ } else {
+ this.localCopyManager = null;
+ }
}
@Override
@@ -116,6 +142,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
potentialFinishTime.set( 0 );
+ if ( localCopyManager != null ) {
+ localCopyManager.interrupt();
+ }
safeClose( tmpFileHandle );
}
@@ -191,14 +220,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
LOGGER.debug( this.getId() + ": Rejecting null block hash list" );
return;
}
- chunks.updateSha1Sums( hashList );
+ int firstNew = chunks.updateSha1Sums( hashList );
+ // No hash checker? Neither hashing nor server side dedup will make sense
if ( hashChecker == null )
return;
+ // Check hashes of completed blocks
for ( int cnt = 0; cnt < 3; ++cnt ) {
FileChunk chunk = chunks.getUnhashedComplete();
if ( chunk == null )
break;
- byte[] data;
+ byte[] data = null;
try {
data = loadChunkFromFile( chunk );
} catch ( EOFException e1 ) {
@@ -227,6 +258,33 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return;
}
}
+ // See if we have any candidates for local copy
+ checkLocalCopyCandidates( hashList, firstNew );
+ }
+
+ private void checkLocalCopyCandidates( List<byte[]> hashList, int firstNew )
+ {
+ if ( localChunkSource == null || hashList == null || hashList.isEmpty() )
+ return;
+ List<byte[]> sums;
+ if ( firstNew == 0 ) {
+ sums = hashList;
+ } else {
+ sums = hashList.subList( firstNew, hashList.size() );
+ }
+ if ( sums == null )
+ return;
+ sums = Collections.unmodifiableList( sums );
+ List<ChunkSource> sources = null;
+ try {
+ sources = localChunkSource.getCloneSources( sums );
+ } catch ( Exception e ) {
+ LOGGER.warn( "Could not get chunk sources", e );
+ }
+ if ( sources != null && !sources.isEmpty() ) {
+ chunks.markLocalCopyCandidates( sources );
+ }
+ localCopyManager.trigger();
}
private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException
@@ -288,22 +346,14 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
{
boolean needNewBuffer = false;
if ( currentChunk != null ) {
- needNewBuffer = chunkReceived( currentChunk, buffer );
- if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
- try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ try {
+ if ( chunkReceivedInternal( currentChunk, buffer ) ) {
needNewBuffer = true;
- } catch ( InterruptedException e ) {
- chunks.markCompleted( currentChunk, false );
- currentChunk = null;
- Thread.currentThread().interrupt();
- return null;
}
- } else {
- // We have no hash checker or the hash for the current chunk is unknown - flush to disk
- writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
- chunks.markCompleted( currentChunk, false );
- chunkStatusChanged( currentChunk );
+ } catch ( InterruptedException e3 ) {
+ LOGGER.info( "Downloader was interrupted when trying to hash" );
+ currentChunk = null;
+ return null;
}
if ( needNewBuffer ) {
try {
@@ -357,6 +407,42 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
+ /**
+ *
+ * @param currentChunk
+ * @param buffer
+ * @return true if buffer is used internally and should not be modified in the future, false if
+ * reuse is safe
+ * @throws InterruptedException
+ */
+ final boolean chunkReceivedInternal( FileChunk currentChunk, byte[] buffer ) throws InterruptedException
+ {
+ boolean needNewBuffer = false;
+ try {
+ needNewBuffer = chunkReceived( currentChunk, buffer );
+ } catch (Exception e) {
+ LOGGER.warn( "Callback chunkReceived caused exception", e );
+ needNewBuffer = true; // To be on the safe side
+ }
+ InterruptedException passEx = null;
+ if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
+ try {
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ return true;
+ } catch ( InterruptedException e ) {
+ passEx = e;
+ }
+ }
+ // We have no hash checker, or hasher rejected block,
+ // or the hash for the current chunk is unknown - flush to disk
+ writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
+ chunks.markCompleted( currentChunk, false );
+ chunkStatusChanged( currentChunk );
+ if ( passEx != null )
+ throw passEx;
+ return needNewBuffer;
+ }
+
public boolean addConnection( final Downloader connection, ExecutorService pool )
{
if ( state == TransferState.FINISHED ) {
@@ -384,6 +470,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
// If the download failed and we have a current chunk, put it back into
// the queue, so it will be handled again later...
chunks.markFailed( cbh.currentChunk );
+ // Possibly queue for local copy
+ if ( localCopyManager != null && cbh.currentChunk.sha1sum != null ) {
+ List<byte[]> lst = new ArrayList<>( 1 );
+ lst.add( cbh.currentChunk.sha1sum );
+ checkLocalCopyCandidates( lst, 0 );
+ }
chunkStatusChanged( cbh.currentChunk );
}
LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
@@ -399,6 +491,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
} else {
// Keep pumping unhashed chunks into the hasher
queueUnhashedChunk( true );
+ if ( localCopyManager != null ) {
+ localCopyManager.trigger();
+ }
}
}
} );
@@ -494,9 +589,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.markFailed( chunk );
chunkStatusChanged( chunk );
break;
+ case NONE:
+ LOGGER.warn( "Got hashCheckDone with result NONE" );
+ break;
}
// A block finished, see if we can queue a new one
queueUnhashedChunk( false );
+ if ( localCopyManager != null ) {
+ localCopyManager.trigger();
+ }
}
/**
@@ -537,12 +638,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
- private synchronized void finishUploadInternal()
+ final synchronized void finishUploadInternal()
{
if ( state == TransferState.FINISHED ) {
return;
}
safeClose( tmpFileHandle );
+ if ( localCopyManager != null ) {
+ localCopyManager.interrupt();
+ }
if ( state != TransferState.WORKING ) {
state = TransferState.ERROR;
} else {
@@ -585,4 +689,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return false;
}
+ public boolean isServerSideCopyingEnabled()
+ {
+ return localCopyManager != null && !localCopyManager.isPaused();
+ }
+
+ public void enableServerSideCopying( boolean serverSideCopying )
+ {
+ if ( localCopyManager != null ) {
+ localCopyManager.setPaused( !serverSideCopying );
+ }
+ }
+
}