diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java | 160 |
1 files changed, 138 insertions, 22 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 4fe6d88..c2a9443 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -9,6 +9,7 @@ import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; @@ -19,6 +20,8 @@ import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.LocalChunkSource; +import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; import org.openslx.filetransfer.util.HashChecker.HashResult; @@ -57,17 +60,32 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H */ private boolean fileWritable = true; + /** + * Called for getting local sources for certain chunks by checksum + */ + private final LocalChunkSource localChunkSource; + + /** + * Non-null if local copying is requested + */ + private final LocalCopyManager localCopyManager; + static { long maxMem = Runtime.getRuntime().maxMemory(); if ( maxMem == Long.MAX_VALUE ) { - maxMem = 512; + LOGGER.warn( "Cannot determine maximum JVM memory -- assuming 1GB -- this might not be safe" ); + maxMem = 1024; + } else { + maxMem /= ( 1024 * 1024 ); } - int hashQueueLen = (int) ( maxMem / 100 ); + final int maxLen = Math.max( 6, Runtime.getRuntime().availableProcessors() ); + int hashQueueLen = (int) ( maxMem / 150 ); if ( hashQueueLen < 1 ) { hashQueueLen = 1; - } else if ( hashQueueLen > 6 ) { - hashQueueLen = 6; + } else if ( hashQueueLen > maxLen ) { + hashQueueLen = maxLen; } + LOGGER.debug( "Queue length: " + hashQueueLen ); HashChecker hc; try { hc = new HashChecker( "SHA-1", hashQueueLen ); @@ -79,11 +97,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H /*_*/ - public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes ) + public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes, LocalChunkSource localChunkSource ) throws FileNotFoundException { super( transferId ); this.fileSize = fileSize; + this.localChunkSource = localChunkSource; // Prepare path tmpFileName = absFilePath; tmpFileName.getParentFile().mkdirs(); @@ -96,6 +115,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H LOGGER.debug( "File " + tmpFileName + " is too long and could not be truncated" ); } chunks = new ChunkList( fileSize, blockHashes ); + if ( this.localChunkSource != null ) { + this.localCopyManager = new LocalCopyManager( this, this.chunks ); + this.localCopyManager.start(); + checkLocalCopyCandidates( blockHashes, 0 ); + } else { + this.localCopyManager = null; + } } @Override @@ -116,6 +142,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } potentialFinishTime.set( 0 ); + if ( localCopyManager != null ) { + localCopyManager.interrupt(); + } safeClose( tmpFileHandle ); } @@ -191,14 +220,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H LOGGER.debug( this.getId() + ": Rejecting null block hash list" ); return; } - chunks.updateSha1Sums( hashList ); + int firstNew = chunks.updateSha1Sums( hashList ); + // No hash checker? Neither hashing nor server side dedup will make sense if ( hashChecker == null ) return; + // Check hashes of completed blocks for ( int cnt = 0; cnt < 3; ++cnt ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) break; - byte[] data; + byte[] data = null; try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { @@ -227,6 +258,33 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return; } } + // See if we have any candidates for local copy + checkLocalCopyCandidates( hashList, firstNew ); + } + + private void checkLocalCopyCandidates( List<byte[]> hashList, int firstNew ) + { + if ( localChunkSource == null || hashList == null || hashList.isEmpty() ) + return; + List<byte[]> sums; + if ( firstNew == 0 ) { + sums = hashList; + } else { + sums = hashList.subList( firstNew, hashList.size() ); + } + if ( sums == null ) + return; + sums = Collections.unmodifiableList( sums ); + List<ChunkSource> sources = null; + try { + sources = localChunkSource.getCloneSources( sums ); + } catch ( Exception e ) { + LOGGER.warn( "Could not get chunk sources", e ); + } + if ( sources != null && !sources.isEmpty() ) { + chunks.markLocalCopyCandidates( sources ); + } + localCopyManager.trigger(); } private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException @@ -288,22 +346,14 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H { boolean needNewBuffer = false; if ( currentChunk != null ) { - needNewBuffer = chunkReceived( currentChunk, buffer ); - if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { - try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + try { + if ( chunkReceivedInternal( currentChunk, buffer ) ) { needNewBuffer = true; - } catch ( InterruptedException e ) { - chunks.markCompleted( currentChunk, false ); - currentChunk = null; - Thread.currentThread().interrupt(); - return null; } - } else { - // We have no hash checker or the hash for the current chunk is unknown - flush to disk - writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); - chunks.markCompleted( currentChunk, false ); - chunkStatusChanged( currentChunk ); + } catch ( InterruptedException e3 ) { + LOGGER.info( "Downloader was interrupted when trying to hash" ); + currentChunk = null; + return null; } if ( needNewBuffer ) { try { @@ -357,6 +407,42 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } + /** + * + * @param currentChunk + * @param buffer + * @return true if buffer is used internally and should not be modified in the future, false if + * reuse is safe + * @throws InterruptedException + */ + final boolean chunkReceivedInternal( FileChunk currentChunk, byte[] buffer ) throws InterruptedException + { + boolean needNewBuffer = false; + try { + needNewBuffer = chunkReceived( currentChunk, buffer ); + } catch (Exception e) { + LOGGER.warn( "Callback chunkReceived caused exception", e ); + needNewBuffer = true; // To be on the safe side + } + InterruptedException passEx = null; + if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { + try { + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + return true; + } catch ( InterruptedException e ) { + passEx = e; + } + } + // We have no hash checker, or hasher rejected block, + // or the hash for the current chunk is unknown - flush to disk + writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); + chunks.markCompleted( currentChunk, false ); + chunkStatusChanged( currentChunk ); + if ( passEx != null ) + throw passEx; + return needNewBuffer; + } + public boolean addConnection( final Downloader connection, ExecutorService pool ) { if ( state == TransferState.FINISHED ) { @@ -384,6 +470,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H // If the download failed and we have a current chunk, put it back into // the queue, so it will be handled again later... chunks.markFailed( cbh.currentChunk ); + // Possibly queue for local copy + if ( localCopyManager != null && cbh.currentChunk.sha1sum != null ) { + List<byte[]> lst = new ArrayList<>( 1 ); + lst.add( cbh.currentChunk.sha1sum ); + checkLocalCopyCandidates( lst, 0 ); + } chunkStatusChanged( cbh.currentChunk ); } LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); @@ -399,6 +491,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } else { // Keep pumping unhashed chunks into the hasher queueUnhashedChunk( true ); + if ( localCopyManager != null ) { + localCopyManager.trigger(); + } } } } ); @@ -494,9 +589,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markFailed( chunk ); chunkStatusChanged( chunk ); break; + case NONE: + LOGGER.warn( "Got hashCheckDone with result NONE" ); + break; } // A block finished, see if we can queue a new one queueUnhashedChunk( false ); + if ( localCopyManager != null ) { + localCopyManager.trigger(); + } } /** @@ -537,12 +638,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } - private synchronized void finishUploadInternal() + final synchronized void finishUploadInternal() { if ( state == TransferState.FINISHED ) { return; } safeClose( tmpFileHandle ); + if ( localCopyManager != null ) { + localCopyManager.interrupt(); + } if ( state != TransferState.WORKING ) { state = TransferState.ERROR; } else { @@ -585,4 +689,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return false; } + public boolean isServerSideCopyingEnabled() + { + return localCopyManager != null && !localCopyManager.isPaused(); + } + + public void enableServerSideCopying( boolean serverSideCopying ) + { + if ( localCopyManager != null ) { + localCopyManager.setPaused( !serverSideCopying ); + } + } + } |