summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer
diff options
context:
space:
mode:
authorSimon Rettberg2018-05-11 17:35:51 +0200
committerSimon Rettberg2018-05-11 17:35:51 +0200
commit8cf60948213a141b86e9a7128359545040f97276 (patch)
tree20662a196e92717b2b1147c586b946472e9471d1 /src/main/java/org/openslx/filetransfer
parentDo what the javadoc says... (diff)
downloadmaster-sync-shared-8cf60948213a141b86e9a7128359545040f97276.tar.gz
master-sync-shared-8cf60948213a141b86e9a7128359545040f97276.tar.xz
master-sync-shared-8cf60948213a141b86e9a7128359545040f97276.zip
Support copying existing chunks server side
Can speed up uploads if the storage backend is fast enough.
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
-rw-r--r--src/main/java/org/openslx/filetransfer/LocalChunkSource.java42
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java87
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java41
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java45
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java160
-rw-r--r--src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java208
6 files changed, 537 insertions, 46 deletions
diff --git a/src/main/java/org/openslx/filetransfer/LocalChunkSource.java b/src/main/java/org/openslx/filetransfer/LocalChunkSource.java
new file mode 100644
index 0000000..c6f5fc3
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/LocalChunkSource.java
@@ -0,0 +1,42 @@
+package org.openslx.filetransfer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public interface LocalChunkSource
+{
+
+ public static class ChunkSource
+ {
+ public final List<SourceFile> sourceCandidates;
+ public final byte[] sha1sum;
+
+ public ChunkSource( byte[] sha1sum )
+ {
+ this.sha1sum = sha1sum;
+ this.sourceCandidates = new ArrayList<>();
+ }
+
+ public void addFile( String file, long offset, int size )
+ {
+ this.sourceCandidates.add( new SourceFile( file, offset, size ) );
+ }
+ }
+
+ public List<ChunkSource> getCloneSources( List<byte[]> sums );
+
+ public static class SourceFile
+ {
+ public final String fileName;
+ public final long offset;
+ public final int chunkSize;
+
+ public SourceFile( String file, long offset, int size )
+ {
+ this.fileName = file;
+ this.offset = offset;
+ this.chunkSize = size;
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index cd1bc69..11f64e8 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -11,6 +11,7 @@ import java.util.List;
import java.util.zip.CRC32;
import org.apache.log4j.Logger;
+import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.util.ThriftUtil;
public class ChunkList
@@ -26,7 +27,7 @@ public class ChunkList
/**
* Chunks that are missing from the file
*/
- private final List<FileChunk> missingChunks = new LinkedList<>();
+ private final LinkedList<FileChunk> missingChunks = new LinkedList<>();
/**
* Chunks that are currently being uploaded or hash-checked
@@ -58,21 +59,26 @@ public class ChunkList
* upload, but periodically update it while the upload is running.
*
* @param sha1Sums list of sums
+ * @return lowest index of chunk that didn't have a sha1sum before, -1 if no new ones
*/
- public synchronized void updateSha1Sums( List<byte[]> sha1Sums )
+ public synchronized int updateSha1Sums( List<byte[]> sha1Sums )
{
int index = 0;
+ int firstNew = -1;
for ( byte[] sum : sha1Sums ) {
if ( index >= allChunks.size() )
break;
if ( sum != null ) {
- allChunks.get( index ).setSha1Sum( sum );
+ if ( allChunks.get( index ).setSha1Sum( sum ) && firstNew == -1 ) {
+ firstNew = index;
+ }
if ( !hasChecksum ) {
hasChecksum = true;
}
}
index++;
}
+ return firstNew;
}
/**
@@ -120,13 +126,84 @@ public class ChunkList
if ( missingChunks.isEmpty() )
return null;
}
- FileChunk c = missingChunks.remove( 0 );
+ FileChunk c = missingChunks.removeFirst();
c.setStatus( ChunkStatus.UPLOADING );
pendingChunks.add( c );
return c;
}
/**
+ * Returns true if this list contains a chunk with state MISSING,
+ * which means the chunk doesn't have a sha1 known to exist in
+ * another image.
+ * @return
+ */
+ public synchronized boolean hasLocallyMissingChunk()
+ {
+ return !missingChunks.isEmpty() && missingChunks.peekFirst().status == ChunkStatus.MISSING;
+ }
+
+ /**
+ * Get a chunk that is marked as candidate for copying.
+ * Returns null if none are available.
+ */
+ public synchronized FileChunk getCopyCandidate()
+ {
+ if ( missingChunks.isEmpty() )
+ return null;
+ FileChunk last = missingChunks.removeLast();
+ if ( last.status != ChunkStatus.QUEUED_FOR_COPY ) {
+ // Put back
+ missingChunks.add( last );
+ return null;
+ }
+ // Is a candidate
+ last.setStatus( ChunkStatus.COPYING );
+ pendingChunks.add( last );
+ return last;
+ }
+
+ /**
+ * Mark the given chunks for potential local copying instead of receiving them
+ * from peer.
+ * @param firstNew
+ * @param sources
+ */
+ public synchronized void markLocalCopyCandidates( List<ChunkSource> sources )
+ {
+ for ( ChunkSource src : sources ) {
+ try {
+ if ( src.sourceCandidates.isEmpty() )
+ continue;
+ List<FileChunk> append = null;
+ for ( Iterator<FileChunk> it = missingChunks.iterator(); it.hasNext(); ) {
+ FileChunk chunk = it.next();
+ if ( !Arrays.equals( chunk.sha1sum, src.sha1sum ) )
+ continue;
+ if ( chunk.status == ChunkStatus.QUEUED_FOR_COPY )
+ continue;
+ // Bingo
+ if ( append == null ) {
+ append = new ArrayList<>( 20 );
+ }
+ it.remove();
+ chunk.setStatus( ChunkStatus.QUEUED_FOR_COPY );
+ chunk.setSource( src );
+ append.add( chunk );
+ }
+ if ( append != null ) {
+ // Move all the chunks queued for copying to the end of the list, so when
+ // we getMissing() a chunk for upload from client, these ones would only
+ // come last, in case reading from storage and writing back is really slow
+ missingChunks.addAll( append );
+ }
+ } catch ( Exception e ) {
+ LOGGER.warn( "chunk clone list if messed up", e );
+ }
+ }
+ }
+
+ /**
* Get the block status as byte representation.
*/
public synchronized ByteBuffer getStatusArray()
@@ -235,7 +312,7 @@ public class ChunkList
}
// Add as first element so it will be re-transmitted immediately
c.setStatus( ChunkStatus.MISSING );
- missingChunks.add( 0, c );
+ missingChunks.addFirst( c );
this.notifyAll();
return c.incFailed();
}
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index 6450af2..f302b3c 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -4,11 +4,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.zip.CRC32;
+import org.apache.log4j.Logger;
import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
public class FileChunk
{
+ private static final Logger LOGGER = Logger.getLogger( FileChunk.class );
+
/**
* Length in bytes of binary sha1 representation
*/
@@ -22,6 +26,7 @@ public class FileChunk
protected CRC32 crc32;
protected ChunkStatus status = ChunkStatus.MISSING;
private boolean writtenToDisk = false;
+ private ChunkSource localSource = null;
public FileChunk( long startOffset, long endOffset, byte[] sha1sum )
{
@@ -33,14 +38,15 @@ public class FileChunk
}
}
- synchronized void setSha1Sum( byte[] sha1sum )
+ synchronized boolean setSha1Sum( byte[] sha1sum )
{
if ( this.sha1sum != null || sha1sum == null || sha1sum.length != SHA1_LENGTH )
- return;
+ return false;
this.sha1sum = sha1sum;
if ( this.status == ChunkStatus.COMPLETE ) {
this.status = ChunkStatus.HASHING;
}
+ return true;
}
/**
@@ -79,19 +85,29 @@ public class FileChunk
{
// As this is usually called before we validated the sha1, handle the case where
// this gets called multiple times and only remember the last result
+ long old = Long.MAX_VALUE;
if ( crc32 == null ) {
crc32 = new CRC32();
} else {
+ LOGGER.info( "Redoing CRC32 of Chunk " + getChunkIndex() );
+ old = crc32.getValue();
crc32.reset();
}
- int chunkLength = range.getLength();
- crc32.update( data, 0, chunkLength );
- if ( ( chunkLength % 4096 ) != 0 ) {
+ int expectedLength = range.getLength();
+ if ( expectedLength > data.length ) {
+ LOGGER.error( "Chunk #" + getChunkIndex() + ": " + data.length + " instead of " + expectedLength + " for " + getChunkIndex() );
+ }
+ crc32.update( data, 0, expectedLength );
+ if ( ( expectedLength % 4096 ) != 0 ) {
// DNBD3 virtually pads all images to be a multiple of 4KiB in size,
// so simulate that here too
- byte[] padding = new byte[ 4096 - ( chunkLength % 4096 ) ];
+ LOGGER.debug( "Block " + getChunkIndex() + " not multiple of 4k." );
+ byte[] padding = new byte[ 4096 - ( expectedLength % 4096 ) ];
crc32.update( padding );
}
+ if ( old != Long.MAX_VALUE && old != crc32.getValue() ) {
+ LOGGER.warn( String.format( "Changed from %x to %x", old, crc32.getValue() ) );
+ }
}
public synchronized void getCrc32Le( byte[] buffer, int offset )
@@ -119,7 +135,7 @@ public class FileChunk
if ( status != null ) {
if ( status == ChunkStatus.COMPLETE ) {
this.writtenToDisk = true;
- } else if ( status == ChunkStatus.MISSING ) {
+ } else if ( status == ChunkStatus.MISSING || status == ChunkStatus.QUEUED_FOR_COPY ) {
this.writtenToDisk = false;
}
this.status = status;
@@ -161,4 +177,15 @@ public class FileChunk
{
return failCount;
}
+
+ public void setSource( ChunkSource src )
+ {
+ this.localSource = src;
+ }
+
+ public ChunkSource getSources()
+ {
+ return this.localSource;
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index 273bc7e..2c404db 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -7,6 +7,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -24,7 +25,9 @@ public class HashChecker
private final String algorithm;
- private volatile boolean invalid = false;
+ private boolean invalid = false;
+
+ private final int queueCapacity;
public HashChecker( String algorithm ) throws NoSuchAlgorithmException
{
@@ -34,6 +37,7 @@ public class HashChecker
public HashChecker( String algorithm, int queueLen ) throws NoSuchAlgorithmException
{
this.algorithm = algorithm;
+ this.queueCapacity = queueLen;
this.queue = new LinkedBlockingQueue<>( queueLen );
CheckThread thread = new CheckThread( false );
thread.start();
@@ -44,6 +48,7 @@ public class HashChecker
{
synchronized ( threads ) {
threads.remove( thread );
+ LOGGER.debug( "Check threads: " + threads.size() );
if ( thread.extraThread )
return;
invalid = true;
@@ -109,6 +114,7 @@ public class HashChecker
CheckThread thread = new CheckThread( true );
thread.start();
threads.add( thread );
+ LOGGER.debug( "Check threads: " + threads.size() );
} catch ( Exception e ) {
LOGGER.warn( "Could not create additional hash checking thread", e );
}
@@ -127,6 +133,19 @@ public class HashChecker
return true;
}
+ /**
+ * Get number of chunks currently waiting for a worker thread.
+ */
+ public int getQueueFill()
+ {
+ return queue.size();
+ }
+
+ public int getQueueCapacity()
+ {
+ return queueCapacity;
+ }
+
// ############################################################# \\
private class CheckThread extends Thread
@@ -156,16 +175,23 @@ public class HashChecker
HashTask task;
// Wait for work
try {
- task = queue.take();
- if ( task == null )
- continue;
+ if ( extraThread ) {
+ task = queue.poll( 30, TimeUnit.SECONDS );
+ if ( task == null ) {
+ break;
+ }
+ } else {
+ task = queue.take();
+ if ( task == null )
+ continue;
+ }
} catch ( InterruptedException e ) {
LOGGER.info( "Interrupted while waiting for hash task", e );
break;
}
HashResult result = HashResult.NONE;
if ( task.doHash ) {
- // Calculate digest
+ // Calculate digest
md.update( task.data, 0, task.chunk.range.getLength() );
byte[] digest = md.digest();
result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
@@ -175,14 +201,9 @@ public class HashChecker
task.chunk.calculateDnbd3Crc32( task.data );
}
execCallback( task, result );
- if ( extraThread && queue.isEmpty() ) {
- break;
- }
}
- if ( extraThread ) {
- LOGGER.info( "Stopped additional hash checker" );
- } else {
- LOGGER.info( "Stopped MAIN hash checker" );
+ if ( !extraThread ) {
+ LOGGER.warn( "Stopped MAIN hash checker" );
}
threadFailed( this );
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 4fe6d88..c2a9443 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -9,6 +9,7 @@ import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -19,6 +20,8 @@ import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.LocalChunkSource;
+import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
import org.openslx.filetransfer.util.HashChecker.HashResult;
@@ -57,17 +60,32 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
*/
private boolean fileWritable = true;
+ /**
+ * Called for getting local sources for certain chunks by checksum
+ */
+ private final LocalChunkSource localChunkSource;
+
+ /**
+ * Non-null if local copying is requested
+ */
+ private final LocalCopyManager localCopyManager;
+
static {
long maxMem = Runtime.getRuntime().maxMemory();
if ( maxMem == Long.MAX_VALUE ) {
- maxMem = 512;
+ LOGGER.warn( "Cannot determine maximum JVM memory -- assuming 1GB -- this might not be safe" );
+ maxMem = 1024;
+ } else {
+ maxMem /= ( 1024 * 1024 );
}
- int hashQueueLen = (int) ( maxMem / 100 );
+ final int maxLen = Math.max( 6, Runtime.getRuntime().availableProcessors() );
+ int hashQueueLen = (int) ( maxMem / 150 );
if ( hashQueueLen < 1 ) {
hashQueueLen = 1;
- } else if ( hashQueueLen > 6 ) {
- hashQueueLen = 6;
+ } else if ( hashQueueLen > maxLen ) {
+ hashQueueLen = maxLen;
}
+ LOGGER.debug( "Queue length: " + hashQueueLen );
HashChecker hc;
try {
hc = new HashChecker( "SHA-1", hashQueueLen );
@@ -79,11 +97,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
/*_*/
- public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes )
+ public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes, LocalChunkSource localChunkSource )
throws FileNotFoundException
{
super( transferId );
this.fileSize = fileSize;
+ this.localChunkSource = localChunkSource;
// Prepare path
tmpFileName = absFilePath;
tmpFileName.getParentFile().mkdirs();
@@ -96,6 +115,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
LOGGER.debug( "File " + tmpFileName + " is too long and could not be truncated" );
}
chunks = new ChunkList( fileSize, blockHashes );
+ if ( this.localChunkSource != null ) {
+ this.localCopyManager = new LocalCopyManager( this, this.chunks );
+ this.localCopyManager.start();
+ checkLocalCopyCandidates( blockHashes, 0 );
+ } else {
+ this.localCopyManager = null;
+ }
}
@Override
@@ -116,6 +142,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
potentialFinishTime.set( 0 );
+ if ( localCopyManager != null ) {
+ localCopyManager.interrupt();
+ }
safeClose( tmpFileHandle );
}
@@ -191,14 +220,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
LOGGER.debug( this.getId() + ": Rejecting null block hash list" );
return;
}
- chunks.updateSha1Sums( hashList );
+ int firstNew = chunks.updateSha1Sums( hashList );
+ // No hash checker? Neither hashing nor server side dedup will make sense
if ( hashChecker == null )
return;
+ // Check hashes of completed blocks
for ( int cnt = 0; cnt < 3; ++cnt ) {
FileChunk chunk = chunks.getUnhashedComplete();
if ( chunk == null )
break;
- byte[] data;
+ byte[] data = null;
try {
data = loadChunkFromFile( chunk );
} catch ( EOFException e1 ) {
@@ -227,6 +258,33 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return;
}
}
+ // See if we have any candidates for local copy
+ checkLocalCopyCandidates( hashList, firstNew );
+ }
+
+ private void checkLocalCopyCandidates( List<byte[]> hashList, int firstNew )
+ {
+ if ( localChunkSource == null || hashList == null || hashList.isEmpty() )
+ return;
+ List<byte[]> sums;
+ if ( firstNew == 0 ) {
+ sums = hashList;
+ } else {
+ sums = hashList.subList( firstNew, hashList.size() );
+ }
+ if ( sums == null )
+ return;
+ sums = Collections.unmodifiableList( sums );
+ List<ChunkSource> sources = null;
+ try {
+ sources = localChunkSource.getCloneSources( sums );
+ } catch ( Exception e ) {
+ LOGGER.warn( "Could not get chunk sources", e );
+ }
+ if ( sources != null && !sources.isEmpty() ) {
+ chunks.markLocalCopyCandidates( sources );
+ }
+ localCopyManager.trigger();
}
private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException
@@ -288,22 +346,14 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
{
boolean needNewBuffer = false;
if ( currentChunk != null ) {
- needNewBuffer = chunkReceived( currentChunk, buffer );
- if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
- try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ try {
+ if ( chunkReceivedInternal( currentChunk, buffer ) ) {
needNewBuffer = true;
- } catch ( InterruptedException e ) {
- chunks.markCompleted( currentChunk, false );
- currentChunk = null;
- Thread.currentThread().interrupt();
- return null;
}
- } else {
- // We have no hash checker or the hash for the current chunk is unknown - flush to disk
- writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
- chunks.markCompleted( currentChunk, false );
- chunkStatusChanged( currentChunk );
+ } catch ( InterruptedException e3 ) {
+ LOGGER.info( "Downloader was interrupted when trying to hash" );
+ currentChunk = null;
+ return null;
}
if ( needNewBuffer ) {
try {
@@ -357,6 +407,42 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
+ /**
+ *
+ * @param currentChunk
+ * @param buffer
+ * @return true if buffer is used internally and should not be modified in the future, false if
+ * reuse is safe
+ * @throws InterruptedException
+ */
+ final boolean chunkReceivedInternal( FileChunk currentChunk, byte[] buffer ) throws InterruptedException
+ {
+ boolean needNewBuffer = false;
+ try {
+ needNewBuffer = chunkReceived( currentChunk, buffer );
+ } catch (Exception e) {
+ LOGGER.warn( "Callback chunkReceived caused exception", e );
+ needNewBuffer = true; // To be on the safe side
+ }
+ InterruptedException passEx = null;
+ if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
+ try {
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ return true;
+ } catch ( InterruptedException e ) {
+ passEx = e;
+ }
+ }
+ // We have no hash checker, or hasher rejected block,
+ // or the hash for the current chunk is unknown - flush to disk
+ writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
+ chunks.markCompleted( currentChunk, false );
+ chunkStatusChanged( currentChunk );
+ if ( passEx != null )
+ throw passEx;
+ return needNewBuffer;
+ }
+
public boolean addConnection( final Downloader connection, ExecutorService pool )
{
if ( state == TransferState.FINISHED ) {
@@ -384,6 +470,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
// If the download failed and we have a current chunk, put it back into
// the queue, so it will be handled again later...
chunks.markFailed( cbh.currentChunk );
+ // Possibly queue for local copy
+ if ( localCopyManager != null && cbh.currentChunk.sha1sum != null ) {
+ List<byte[]> lst = new ArrayList<>( 1 );
+ lst.add( cbh.currentChunk.sha1sum );
+ checkLocalCopyCandidates( lst, 0 );
+ }
chunkStatusChanged( cbh.currentChunk );
}
LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
@@ -399,6 +491,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
} else {
// Keep pumping unhashed chunks into the hasher
queueUnhashedChunk( true );
+ if ( localCopyManager != null ) {
+ localCopyManager.trigger();
+ }
}
}
} );
@@ -494,9 +589,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.markFailed( chunk );
chunkStatusChanged( chunk );
break;
+ case NONE:
+ LOGGER.warn( "Got hashCheckDone with result NONE" );
+ break;
}
// A block finished, see if we can queue a new one
queueUnhashedChunk( false );
+ if ( localCopyManager != null ) {
+ localCopyManager.trigger();
+ }
}
/**
@@ -537,12 +638,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
- private synchronized void finishUploadInternal()
+ final synchronized void finishUploadInternal()
{
if ( state == TransferState.FINISHED ) {
return;
}
safeClose( tmpFileHandle );
+ if ( localCopyManager != null ) {
+ localCopyManager.interrupt();
+ }
if ( state != TransferState.WORKING ) {
state = TransferState.ERROR;
} else {
@@ -585,4 +689,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return false;
}
+ public boolean isServerSideCopyingEnabled()
+ {
+ return localCopyManager != null && !localCopyManager.isPaused();
+ }
+
+ public void enableServerSideCopying( boolean serverSideCopying )
+ {
+ if ( localCopyManager != null ) {
+ localCopyManager.setPaused( !serverSideCopying );
+ }
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
new file mode 100644
index 0000000..8943524
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
@@ -0,0 +1,208 @@
+package org.openslx.filetransfer.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
+import org.openslx.filetransfer.LocalChunkSource.SourceFile;
+import org.openslx.util.Util;
+
+public class LocalCopyManager extends Thread
+{
+
+ private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class );
+
+ private FileChunk currentChunk = null;
+
+ private final ChunkList chunkList;
+
+ private final IncomingTransferBase transfer;
+
+ private final Map<String, RandomAccessFile> sources = new HashMap<>();
+
+ private Semaphore hasWork = new Semaphore( 0 );
+
+ private AtomicInteger copyCount = new AtomicInteger();
+
+ private boolean paused = true;
+
+ public LocalCopyManager( IncomingTransferBase transfer, ChunkList list )
+ {
+ super( "LocalCopyManager" );
+ this.transfer = transfer;
+ this.chunkList = list;
+ }
+
+ /**
+ * Trigger copying of another block if possible
+ */
+ public synchronized void trigger()
+ {
+ if ( this.paused )
+ return;
+ if ( !isAlive() ) {
+ LOGGER.warn( "Cannot be triggered when Thread is not running." );
+ if ( currentChunk != null ) {
+ chunkList.markFailed( currentChunk );
+ currentChunk = null;
+ }
+ return;
+ }
+ if ( currentChunk == null ) {
+ currentChunk = chunkList.getCopyCandidate();
+ hasWork.release();
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ while ( !interrupted() ) {
+ while ( currentChunk != null ) {
+ hasWork.drainPermits();
+ copyChunk();
+ }
+ if ( !hasWork.tryAcquire( 10, TimeUnit.SECONDS ) ) {
+ if ( chunkList.isComplete() ) {
+ transfer.finishUploadInternal();
+ break;
+ } else if ( !transfer.isActive() ) {
+ break;
+ } else {
+ trigger();
+ }
+ }
+ }
+ } catch ( InterruptedException | IllegalStateException e ) {
+ interrupt();
+ }
+ synchronized ( this ) {
+ if ( currentChunk != null ) {
+ LOGGER.warn( "Still had a chunk when thread was interrupted." );
+ chunkList.markFailed( currentChunk );
+ currentChunk = null;
+ }
+ }
+ for ( RandomAccessFile file : sources.values() ) {
+ Util.safeClose( file );
+ }
+ LOGGER.debug( "My work here is done. Copied " + copyCount.get() + " chunks from " + sources.size() + " files." );
+ }
+
+ private void copyChunk() throws InterruptedException
+ {
+ ChunkSource source = currentChunk.getSources();
+ if ( source != null ) {
+ // OK
+ for ( ;; ) {
+ // Try every possible source file
+ SourceFile sourceFile = getOpenFile( source, currentChunk.range.getLength() );
+ if ( sourceFile == null ) {
+ // Was marked as having a source file, but now we got null -- most likely
+ // the source file doesn't exist or isn't readable
+ LOGGER.warn( "No open file for local copying!" );
+ break;
+ }
+ // OK
+ RandomAccessFile raf = sources.get( sourceFile.fileName );
+ byte[] buffer;
+ try {
+ raf.seek( sourceFile.offset );
+ // In order not to hinder (fast) upload of unknown blocks, throttle
+ // local copying as long as chunks are missing - do before allocating buffer
+ // so we don't hold allocated unused memory for no reason, but the seek has
+ // been done so we know the file handle is not goofed up
+ if ( chunkList.hasLocallyMissingChunk() ) {
+ int delay;
+ HashChecker hc = transfer.getHashChecker();
+ if ( hc == null ) {
+ delay = 50;
+ } else {
+ delay = ( hc.getQueueFill() * 500 ) / hc.getQueueCapacity();
+ }
+ Thread.sleep( delay );
+ }
+ buffer = new byte[ sourceFile.chunkSize ];
+ raf.readFully( buffer );
+ } catch ( InterruptedException e ) {
+ throw e;
+ } catch ( Exception e ) {
+ LOGGER.warn( "Could not read chunk to replicate from " + sourceFile.fileName, e );
+ buffer = null;
+ if ( e instanceof IOException ) {
+ // Mark file as messed up
+ sources.put( sourceFile.fileName, null );
+ }
+ }
+ if ( buffer != null ) {
+ // All is well, read chunk locally, pass on
+ transfer.chunkReceivedInternal( currentChunk, buffer );
+ synchronized ( this ) {
+ currentChunk = null;
+ }
+ copyCount.incrementAndGet();
+ trigger();
+ return;
+ }
+ // Reaching here means failure
+ // We'll keep looping as long as there are source files available
+ }
+ // End of loop over source files
+ }
+ // FAILED
+ LOGGER.info( "Local copying failed, queueing for normal upload..." );
+ synchronized ( this ) {
+ chunkList.markFailed( currentChunk );
+ currentChunk = null;
+ }
+ }
+
+ private SourceFile getOpenFile( ChunkSource source, int requiredSize )
+ {
+ for ( SourceFile candidate : source.sourceCandidates ) {
+ if ( sources.get( candidate.fileName ) != null )
+ return candidate;
+ }
+ // Have to open
+ for ( SourceFile candidate : source.sourceCandidates ) {
+ if ( sources.containsKey( candidate.fileName ) ) // Maps to null (otherwise upper loop would have returned)
+ continue; // File is broken, don't use
+ if ( candidate.chunkSize != requiredSize )
+ continue;
+ File f = new File( candidate.fileName );
+ if ( !f.exists() ) {
+ sources.put( candidate.fileName, null ); // Mark for future
+ continue;
+ }
+ try {
+ RandomAccessFile raf = new RandomAccessFile( f, "r" );
+ sources.put( candidate.fileName, raf );
+ return candidate;
+ } catch ( Exception e ) {
+ LOGGER.info( "Cannot open " + candidate.fileName, e );
+ sources.put( candidate.fileName, null ); // Mark for future
+ }
+ }
+ // Nothing worked
+ return null;
+ }
+
+ public boolean isPaused()
+ {
+ return paused;
+ }
+
+ public void setPaused( boolean paused )
+ {
+ this.paused = paused;
+ }
+
+}