diff options
author | Simon Rettberg | 2018-05-11 17:35:51 +0200 |
---|---|---|
committer | Simon Rettberg | 2018-05-11 17:35:51 +0200 |
commit | 8cf60948213a141b86e9a7128359545040f97276 (patch) | |
tree | 20662a196e92717b2b1147c586b946472e9471d1 /src/main/java/org/openslx/filetransfer/util | |
parent | Do what the javadoc says... (diff) | |
download | master-sync-shared-8cf60948213a141b86e9a7128359545040f97276.tar.gz master-sync-shared-8cf60948213a141b86e9a7128359545040f97276.tar.xz master-sync-shared-8cf60948213a141b86e9a7128359545040f97276.zip |
Support copying existing chunks server side
Can speed up uploads if the storage backend is fast enough.
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util')
5 files changed, 495 insertions, 46 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index cd1bc69..11f64e8 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.zip.CRC32; import org.apache.log4j.Logger; +import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.util.ThriftUtil; public class ChunkList @@ -26,7 +27,7 @@ public class ChunkList /** * Chunks that are missing from the file */ - private final List<FileChunk> missingChunks = new LinkedList<>(); + private final LinkedList<FileChunk> missingChunks = new LinkedList<>(); /** * Chunks that are currently being uploaded or hash-checked @@ -58,21 +59,26 @@ public class ChunkList * upload, but periodically update it while the upload is running. * * @param sha1Sums list of sums + * @return lowest index of chunk that didn't have a sha1sum before, -1 if no new ones */ - public synchronized void updateSha1Sums( List<byte[]> sha1Sums ) + public synchronized int updateSha1Sums( List<byte[]> sha1Sums ) { int index = 0; + int firstNew = -1; for ( byte[] sum : sha1Sums ) { if ( index >= allChunks.size() ) break; if ( sum != null ) { - allChunks.get( index ).setSha1Sum( sum ); + if ( allChunks.get( index ).setSha1Sum( sum ) && firstNew == -1 ) { + firstNew = index; + } if ( !hasChecksum ) { hasChecksum = true; } } index++; } + return firstNew; } /** @@ -120,13 +126,84 @@ public class ChunkList if ( missingChunks.isEmpty() ) return null; } - FileChunk c = missingChunks.remove( 0 ); + FileChunk c = missingChunks.removeFirst(); c.setStatus( ChunkStatus.UPLOADING ); pendingChunks.add( c ); return c; } /** + * Returns true if this list contains a chunk with state MISSING, + * which means the chunk doesn't have a sha1 known to exist in + * another image. + * @return + */ + public synchronized boolean hasLocallyMissingChunk() + { + return !missingChunks.isEmpty() && missingChunks.peekFirst().status == ChunkStatus.MISSING; + } + + /** + * Get a chunk that is marked as candidate for copying. + * Returns null if none are available. + */ + public synchronized FileChunk getCopyCandidate() + { + if ( missingChunks.isEmpty() ) + return null; + FileChunk last = missingChunks.removeLast(); + if ( last.status != ChunkStatus.QUEUED_FOR_COPY ) { + // Put back + missingChunks.add( last ); + return null; + } + // Is a candidate + last.setStatus( ChunkStatus.COPYING ); + pendingChunks.add( last ); + return last; + } + + /** + * Mark the given chunks for potential local copying instead of receiving them + * from peer. + * @param firstNew + * @param sources + */ + public synchronized void markLocalCopyCandidates( List<ChunkSource> sources ) + { + for ( ChunkSource src : sources ) { + try { + if ( src.sourceCandidates.isEmpty() ) + continue; + List<FileChunk> append = null; + for ( Iterator<FileChunk> it = missingChunks.iterator(); it.hasNext(); ) { + FileChunk chunk = it.next(); + if ( !Arrays.equals( chunk.sha1sum, src.sha1sum ) ) + continue; + if ( chunk.status == ChunkStatus.QUEUED_FOR_COPY ) + continue; + // Bingo + if ( append == null ) { + append = new ArrayList<>( 20 ); + } + it.remove(); + chunk.setStatus( ChunkStatus.QUEUED_FOR_COPY ); + chunk.setSource( src ); + append.add( chunk ); + } + if ( append != null ) { + // Move all the chunks queued for copying to the end of the list, so when + // we getMissing() a chunk for upload from client, these ones would only + // come last, in case reading from storage and writing back is really slow + missingChunks.addAll( append ); + } + } catch ( Exception e ) { + LOGGER.warn( "chunk clone list if messed up", e ); + } + } + } + + /** * Get the block status as byte representation. */ public synchronized ByteBuffer getStatusArray() @@ -235,7 +312,7 @@ public class ChunkList } // Add as first element so it will be re-transmitted immediately c.setStatus( ChunkStatus.MISSING ); - missingChunks.add( 0, c ); + missingChunks.addFirst( c ); this.notifyAll(); return c.incFailed(); } diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index 6450af2..f302b3c 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -4,11 +4,15 @@ import java.util.Iterator; import java.util.List; import java.util.zip.CRC32; +import org.apache.log4j.Logger; import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.LocalChunkSource.ChunkSource; public class FileChunk { + private static final Logger LOGGER = Logger.getLogger( FileChunk.class ); + /** * Length in bytes of binary sha1 representation */ @@ -22,6 +26,7 @@ public class FileChunk protected CRC32 crc32; protected ChunkStatus status = ChunkStatus.MISSING; private boolean writtenToDisk = false; + private ChunkSource localSource = null; public FileChunk( long startOffset, long endOffset, byte[] sha1sum ) { @@ -33,14 +38,15 @@ public class FileChunk } } - synchronized void setSha1Sum( byte[] sha1sum ) + synchronized boolean setSha1Sum( byte[] sha1sum ) { if ( this.sha1sum != null || sha1sum == null || sha1sum.length != SHA1_LENGTH ) - return; + return false; this.sha1sum = sha1sum; if ( this.status == ChunkStatus.COMPLETE ) { this.status = ChunkStatus.HASHING; } + return true; } /** @@ -79,19 +85,29 @@ public class FileChunk { // As this is usually called before we validated the sha1, handle the case where // this gets called multiple times and only remember the last result + long old = Long.MAX_VALUE; if ( crc32 == null ) { crc32 = new CRC32(); } else { + LOGGER.info( "Redoing CRC32 of Chunk " + getChunkIndex() ); + old = crc32.getValue(); crc32.reset(); } - int chunkLength = range.getLength(); - crc32.update( data, 0, chunkLength ); - if ( ( chunkLength % 4096 ) != 0 ) { + int expectedLength = range.getLength(); + if ( expectedLength > data.length ) { + LOGGER.error( "Chunk #" + getChunkIndex() + ": " + data.length + " instead of " + expectedLength + " for " + getChunkIndex() ); + } + crc32.update( data, 0, expectedLength ); + if ( ( expectedLength % 4096 ) != 0 ) { // DNBD3 virtually pads all images to be a multiple of 4KiB in size, // so simulate that here too - byte[] padding = new byte[ 4096 - ( chunkLength % 4096 ) ]; + LOGGER.debug( "Block " + getChunkIndex() + " not multiple of 4k." ); + byte[] padding = new byte[ 4096 - ( expectedLength % 4096 ) ]; crc32.update( padding ); } + if ( old != Long.MAX_VALUE && old != crc32.getValue() ) { + LOGGER.warn( String.format( "Changed from %x to %x", old, crc32.getValue() ) ); + } } public synchronized void getCrc32Le( byte[] buffer, int offset ) @@ -119,7 +135,7 @@ public class FileChunk if ( status != null ) { if ( status == ChunkStatus.COMPLETE ) { this.writtenToDisk = true; - } else if ( status == ChunkStatus.MISSING ) { + } else if ( status == ChunkStatus.MISSING || status == ChunkStatus.QUEUED_FOR_COPY ) { this.writtenToDisk = false; } this.status = status; @@ -161,4 +177,15 @@ public class FileChunk { return failCount; } + + public void setSource( ChunkSource src ) + { + this.localSource = src; + } + + public ChunkSource getSources() + { + return this.localSource; + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index 273bc7e..2c404db 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -7,6 +7,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -24,7 +25,9 @@ public class HashChecker private final String algorithm; - private volatile boolean invalid = false; + private boolean invalid = false; + + private final int queueCapacity; public HashChecker( String algorithm ) throws NoSuchAlgorithmException { @@ -34,6 +37,7 @@ public class HashChecker public HashChecker( String algorithm, int queueLen ) throws NoSuchAlgorithmException { this.algorithm = algorithm; + this.queueCapacity = queueLen; this.queue = new LinkedBlockingQueue<>( queueLen ); CheckThread thread = new CheckThread( false ); thread.start(); @@ -44,6 +48,7 @@ public class HashChecker { synchronized ( threads ) { threads.remove( thread ); + LOGGER.debug( "Check threads: " + threads.size() ); if ( thread.extraThread ) return; invalid = true; @@ -109,6 +114,7 @@ public class HashChecker CheckThread thread = new CheckThread( true ); thread.start(); threads.add( thread ); + LOGGER.debug( "Check threads: " + threads.size() ); } catch ( Exception e ) { LOGGER.warn( "Could not create additional hash checking thread", e ); } @@ -127,6 +133,19 @@ public class HashChecker return true; } + /** + * Get number of chunks currently waiting for a worker thread. + */ + public int getQueueFill() + { + return queue.size(); + } + + public int getQueueCapacity() + { + return queueCapacity; + } + // ############################################################# \\ private class CheckThread extends Thread @@ -156,16 +175,23 @@ public class HashChecker HashTask task; // Wait for work try { - task = queue.take(); - if ( task == null ) - continue; + if ( extraThread ) { + task = queue.poll( 30, TimeUnit.SECONDS ); + if ( task == null ) { + break; + } + } else { + task = queue.take(); + if ( task == null ) + continue; + } } catch ( InterruptedException e ) { LOGGER.info( "Interrupted while waiting for hash task", e ); break; } HashResult result = HashResult.NONE; if ( task.doHash ) { - // Calculate digest + // Calculate digest md.update( task.data, 0, task.chunk.range.getLength() ); byte[] digest = md.digest(); result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; @@ -175,14 +201,9 @@ public class HashChecker task.chunk.calculateDnbd3Crc32( task.data ); } execCallback( task, result ); - if ( extraThread && queue.isEmpty() ) { - break; - } } - if ( extraThread ) { - LOGGER.info( "Stopped additional hash checker" ); - } else { - LOGGER.info( "Stopped MAIN hash checker" ); + if ( !extraThread ) { + LOGGER.warn( "Stopped MAIN hash checker" ); } threadFailed( this ); } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 4fe6d88..c2a9443 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -9,6 +9,7 @@ import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; @@ -19,6 +20,8 @@ import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.LocalChunkSource; +import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; import org.openslx.filetransfer.util.HashChecker.HashResult; @@ -57,17 +60,32 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H */ private boolean fileWritable = true; + /** + * Called for getting local sources for certain chunks by checksum + */ + private final LocalChunkSource localChunkSource; + + /** + * Non-null if local copying is requested + */ + private final LocalCopyManager localCopyManager; + static { long maxMem = Runtime.getRuntime().maxMemory(); if ( maxMem == Long.MAX_VALUE ) { - maxMem = 512; + LOGGER.warn( "Cannot determine maximum JVM memory -- assuming 1GB -- this might not be safe" ); + maxMem = 1024; + } else { + maxMem /= ( 1024 * 1024 ); } - int hashQueueLen = (int) ( maxMem / 100 ); + final int maxLen = Math.max( 6, Runtime.getRuntime().availableProcessors() ); + int hashQueueLen = (int) ( maxMem / 150 ); if ( hashQueueLen < 1 ) { hashQueueLen = 1; - } else if ( hashQueueLen > 6 ) { - hashQueueLen = 6; + } else if ( hashQueueLen > maxLen ) { + hashQueueLen = maxLen; } + LOGGER.debug( "Queue length: " + hashQueueLen ); HashChecker hc; try { hc = new HashChecker( "SHA-1", hashQueueLen ); @@ -79,11 +97,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H /*_*/ - public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes ) + public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes, LocalChunkSource localChunkSource ) throws FileNotFoundException { super( transferId ); this.fileSize = fileSize; + this.localChunkSource = localChunkSource; // Prepare path tmpFileName = absFilePath; tmpFileName.getParentFile().mkdirs(); @@ -96,6 +115,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H LOGGER.debug( "File " + tmpFileName + " is too long and could not be truncated" ); } chunks = new ChunkList( fileSize, blockHashes ); + if ( this.localChunkSource != null ) { + this.localCopyManager = new LocalCopyManager( this, this.chunks ); + this.localCopyManager.start(); + checkLocalCopyCandidates( blockHashes, 0 ); + } else { + this.localCopyManager = null; + } } @Override @@ -116,6 +142,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } potentialFinishTime.set( 0 ); + if ( localCopyManager != null ) { + localCopyManager.interrupt(); + } safeClose( tmpFileHandle ); } @@ -191,14 +220,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H LOGGER.debug( this.getId() + ": Rejecting null block hash list" ); return; } - chunks.updateSha1Sums( hashList ); + int firstNew = chunks.updateSha1Sums( hashList ); + // No hash checker? Neither hashing nor server side dedup will make sense if ( hashChecker == null ) return; + // Check hashes of completed blocks for ( int cnt = 0; cnt < 3; ++cnt ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) break; - byte[] data; + byte[] data = null; try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { @@ -227,6 +258,33 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return; } } + // See if we have any candidates for local copy + checkLocalCopyCandidates( hashList, firstNew ); + } + + private void checkLocalCopyCandidates( List<byte[]> hashList, int firstNew ) + { + if ( localChunkSource == null || hashList == null || hashList.isEmpty() ) + return; + List<byte[]> sums; + if ( firstNew == 0 ) { + sums = hashList; + } else { + sums = hashList.subList( firstNew, hashList.size() ); + } + if ( sums == null ) + return; + sums = Collections.unmodifiableList( sums ); + List<ChunkSource> sources = null; + try { + sources = localChunkSource.getCloneSources( sums ); + } catch ( Exception e ) { + LOGGER.warn( "Could not get chunk sources", e ); + } + if ( sources != null && !sources.isEmpty() ) { + chunks.markLocalCopyCandidates( sources ); + } + localCopyManager.trigger(); } private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException @@ -288,22 +346,14 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H { boolean needNewBuffer = false; if ( currentChunk != null ) { - needNewBuffer = chunkReceived( currentChunk, buffer ); - if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { - try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + try { + if ( chunkReceivedInternal( currentChunk, buffer ) ) { needNewBuffer = true; - } catch ( InterruptedException e ) { - chunks.markCompleted( currentChunk, false ); - currentChunk = null; - Thread.currentThread().interrupt(); - return null; } - } else { - // We have no hash checker or the hash for the current chunk is unknown - flush to disk - writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); - chunks.markCompleted( currentChunk, false ); - chunkStatusChanged( currentChunk ); + } catch ( InterruptedException e3 ) { + LOGGER.info( "Downloader was interrupted when trying to hash" ); + currentChunk = null; + return null; } if ( needNewBuffer ) { try { @@ -357,6 +407,42 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } + /** + * + * @param currentChunk + * @param buffer + * @return true if buffer is used internally and should not be modified in the future, false if + * reuse is safe + * @throws InterruptedException + */ + final boolean chunkReceivedInternal( FileChunk currentChunk, byte[] buffer ) throws InterruptedException + { + boolean needNewBuffer = false; + try { + needNewBuffer = chunkReceived( currentChunk, buffer ); + } catch (Exception e) { + LOGGER.warn( "Callback chunkReceived caused exception", e ); + needNewBuffer = true; // To be on the safe side + } + InterruptedException passEx = null; + if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { + try { + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + return true; + } catch ( InterruptedException e ) { + passEx = e; + } + } + // We have no hash checker, or hasher rejected block, + // or the hash for the current chunk is unknown - flush to disk + writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); + chunks.markCompleted( currentChunk, false ); + chunkStatusChanged( currentChunk ); + if ( passEx != null ) + throw passEx; + return needNewBuffer; + } + public boolean addConnection( final Downloader connection, ExecutorService pool ) { if ( state == TransferState.FINISHED ) { @@ -384,6 +470,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H // If the download failed and we have a current chunk, put it back into // the queue, so it will be handled again later... chunks.markFailed( cbh.currentChunk ); + // Possibly queue for local copy + if ( localCopyManager != null && cbh.currentChunk.sha1sum != null ) { + List<byte[]> lst = new ArrayList<>( 1 ); + lst.add( cbh.currentChunk.sha1sum ); + checkLocalCopyCandidates( lst, 0 ); + } chunkStatusChanged( cbh.currentChunk ); } LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); @@ -399,6 +491,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } else { // Keep pumping unhashed chunks into the hasher queueUnhashedChunk( true ); + if ( localCopyManager != null ) { + localCopyManager.trigger(); + } } } } ); @@ -494,9 +589,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markFailed( chunk ); chunkStatusChanged( chunk ); break; + case NONE: + LOGGER.warn( "Got hashCheckDone with result NONE" ); + break; } // A block finished, see if we can queue a new one queueUnhashedChunk( false ); + if ( localCopyManager != null ) { + localCopyManager.trigger(); + } } /** @@ -537,12 +638,15 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } - private synchronized void finishUploadInternal() + final synchronized void finishUploadInternal() { if ( state == TransferState.FINISHED ) { return; } safeClose( tmpFileHandle ); + if ( localCopyManager != null ) { + localCopyManager.interrupt(); + } if ( state != TransferState.WORKING ) { state = TransferState.ERROR; } else { @@ -585,4 +689,16 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return false; } + public boolean isServerSideCopyingEnabled() + { + return localCopyManager != null && !localCopyManager.isPaused(); + } + + public void enableServerSideCopying( boolean serverSideCopying ) + { + if ( localCopyManager != null ) { + localCopyManager.setPaused( !serverSideCopying ); + } + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java new file mode 100644 index 0000000..8943524 --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java @@ -0,0 +1,208 @@ +package org.openslx.filetransfer.util; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; +import org.openslx.filetransfer.LocalChunkSource.ChunkSource; +import org.openslx.filetransfer.LocalChunkSource.SourceFile; +import org.openslx.util.Util; + +public class LocalCopyManager extends Thread +{ + + private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class ); + + private FileChunk currentChunk = null; + + private final ChunkList chunkList; + + private final IncomingTransferBase transfer; + + private final Map<String, RandomAccessFile> sources = new HashMap<>(); + + private Semaphore hasWork = new Semaphore( 0 ); + + private AtomicInteger copyCount = new AtomicInteger(); + + private boolean paused = true; + + public LocalCopyManager( IncomingTransferBase transfer, ChunkList list ) + { + super( "LocalCopyManager" ); + this.transfer = transfer; + this.chunkList = list; + } + + /** + * Trigger copying of another block if possible + */ + public synchronized void trigger() + { + if ( this.paused ) + return; + if ( !isAlive() ) { + LOGGER.warn( "Cannot be triggered when Thread is not running." ); + if ( currentChunk != null ) { + chunkList.markFailed( currentChunk ); + currentChunk = null; + } + return; + } + if ( currentChunk == null ) { + currentChunk = chunkList.getCopyCandidate(); + hasWork.release(); + } + } + + @Override + public void run() + { + try { + while ( !interrupted() ) { + while ( currentChunk != null ) { + hasWork.drainPermits(); + copyChunk(); + } + if ( !hasWork.tryAcquire( 10, TimeUnit.SECONDS ) ) { + if ( chunkList.isComplete() ) { + transfer.finishUploadInternal(); + break; + } else if ( !transfer.isActive() ) { + break; + } else { + trigger(); + } + } + } + } catch ( InterruptedException | IllegalStateException e ) { + interrupt(); + } + synchronized ( this ) { + if ( currentChunk != null ) { + LOGGER.warn( "Still had a chunk when thread was interrupted." ); + chunkList.markFailed( currentChunk ); + currentChunk = null; + } + } + for ( RandomAccessFile file : sources.values() ) { + Util.safeClose( file ); + } + LOGGER.debug( "My work here is done. Copied " + copyCount.get() + " chunks from " + sources.size() + " files." ); + } + + private void copyChunk() throws InterruptedException + { + ChunkSource source = currentChunk.getSources(); + if ( source != null ) { + // OK + for ( ;; ) { + // Try every possible source file + SourceFile sourceFile = getOpenFile( source, currentChunk.range.getLength() ); + if ( sourceFile == null ) { + // Was marked as having a source file, but now we got null -- most likely + // the source file doesn't exist or isn't readable + LOGGER.warn( "No open file for local copying!" ); + break; + } + // OK + RandomAccessFile raf = sources.get( sourceFile.fileName ); + byte[] buffer; + try { + raf.seek( sourceFile.offset ); + // In order not to hinder (fast) upload of unknown blocks, throttle + // local copying as long as chunks are missing - do before allocating buffer + // so we don't hold allocated unused memory for no reason, but the seek has + // been done so we know the file handle is not goofed up + if ( chunkList.hasLocallyMissingChunk() ) { + int delay; + HashChecker hc = transfer.getHashChecker(); + if ( hc == null ) { + delay = 50; + } else { + delay = ( hc.getQueueFill() * 500 ) / hc.getQueueCapacity(); + } + Thread.sleep( delay ); + } + buffer = new byte[ sourceFile.chunkSize ]; + raf.readFully( buffer ); + } catch ( InterruptedException e ) { + throw e; + } catch ( Exception e ) { + LOGGER.warn( "Could not read chunk to replicate from " + sourceFile.fileName, e ); + buffer = null; + if ( e instanceof IOException ) { + // Mark file as messed up + sources.put( sourceFile.fileName, null ); + } + } + if ( buffer != null ) { + // All is well, read chunk locally, pass on + transfer.chunkReceivedInternal( currentChunk, buffer ); + synchronized ( this ) { + currentChunk = null; + } + copyCount.incrementAndGet(); + trigger(); + return; + } + // Reaching here means failure + // We'll keep looping as long as there are source files available + } + // End of loop over source files + } + // FAILED + LOGGER.info( "Local copying failed, queueing for normal upload..." ); + synchronized ( this ) { + chunkList.markFailed( currentChunk ); + currentChunk = null; + } + } + + private SourceFile getOpenFile( ChunkSource source, int requiredSize ) + { + for ( SourceFile candidate : source.sourceCandidates ) { + if ( sources.get( candidate.fileName ) != null ) + return candidate; + } + // Have to open + for ( SourceFile candidate : source.sourceCandidates ) { + if ( sources.containsKey( candidate.fileName ) ) // Maps to null (otherwise upper loop would have returned) + continue; // File is broken, don't use + if ( candidate.chunkSize != requiredSize ) + continue; + File f = new File( candidate.fileName ); + if ( !f.exists() ) { + sources.put( candidate.fileName, null ); // Mark for future + continue; + } + try { + RandomAccessFile raf = new RandomAccessFile( f, "r" ); + sources.put( candidate.fileName, raf ); + return candidate; + } catch ( Exception e ) { + LOGGER.info( "Cannot open " + candidate.fileName, e ); + sources.put( candidate.fileName, null ); // Mark for future + } + } + // Nothing worked + return null; + } + + public boolean isPaused() + { + return paused; + } + + public void setPaused( boolean paused ) + { + this.paused = paused; + } + +} |