From bacb060ba31fafebd8c0f15d0b6704732a37b482 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 23 Jul 2015 17:42:30 +0200 Subject: ChunkList.getMissing() now blocks for a while if there are still pending blocks Added HashChecker class to verify checksums of blocks --- .../org/openslx/filetransfer/util/ChunkList.java | 70 +++++---- .../org/openslx/filetransfer/util/FileChunk.java | 66 ++++---- .../org/openslx/filetransfer/util/HashChecker.java | 168 +++++++++++++++++++++ 3 files changed, 250 insertions(+), 54 deletions(-) create mode 100644 src/main/java/org/openslx/filetransfer/util/HashChecker.java (limited to 'src/main/java/org/openslx/filetransfer/util') diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 9154dc8..88d40cf 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -7,9 +7,10 @@ import java.util.List; import org.apache.log4j.Logger; -public class ChunkList { +public class ChunkList +{ - private static final Logger LOGGER = Logger.getLogger(ChunkList.class); + private static final Logger LOGGER = Logger.getLogger( ChunkList.class ); /** * Chunks that are missing from the file @@ -21,44 +22,53 @@ public class ChunkList { */ private final List pendingChunks = new LinkedList<>(); - private final List completeChunks = new ArrayList<>(100); + private final List completeChunks = new ArrayList<>( 100 ); // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying private final ByteBuffer statusArray; // Do we need to keep valid chunks, or chunks that failed too many times? - public ChunkList(long fileSize, List sha1Sums) { - FileChunk.createChunkList(missingChunks, fileSize, sha1Sums); - statusArray = ByteBuffer.allocate(missingChunks.size()); + public ChunkList( long fileSize, List sha1Sums ) + { + FileChunk.createChunkList( missingChunks, fileSize, sha1Sums ); + statusArray = ByteBuffer.allocate( missingChunks.size() ); } /** * Get a missing chunk, marking it pending. * * @return chunk marked as missing + * @throws InterruptedException */ - public synchronized FileChunk getMissing() { - if (missingChunks.isEmpty()) + public synchronized FileChunk getMissing() throws InterruptedException + { + if ( missingChunks.isEmpty() && pendingChunks.isEmpty() ) return null; - FileChunk c = missingChunks.remove(0); - pendingChunks.add(c); + if ( missingChunks.isEmpty() ) { + this.wait( 3000 ); + if ( missingChunks.isEmpty() ) + return null; + } + FileChunk c = missingChunks.remove( 0 ); + pendingChunks.add( c ); return c; } /** * Get the block status as byte representation. */ - public synchronized ByteBuffer getStatusArray() { + public synchronized ByteBuffer getStatusArray() + { byte[] array = statusArray.array(); //Arrays.fill(array, (byte)0); - for (FileChunk c : missingChunks) { + for ( FileChunk c : missingChunks ) { array[c.getChunkIndex()] = 1; } - for (FileChunk c : pendingChunks) { + for ( FileChunk c : pendingChunks ) { array[c.getChunkIndex()] = 2; } - for (FileChunk c : completeChunks) { + for ( FileChunk c : completeChunks ) { array[c.getChunkIndex()] = 0; } return statusArray; @@ -69,8 +79,9 @@ public class ChunkList { * * @return List containing all successfully transfered chunks */ - public synchronized List getCompleted() { - return new ArrayList<>(completeChunks); + public synchronized List getCompleted() + { + return new ArrayList<>( completeChunks ); } /** @@ -78,13 +89,15 @@ public class ChunkList { * * @param c The chunk in question */ - public synchronized void markSuccessful(FileChunk c) { - if (!pendingChunks.remove(c)) { - LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString() - + ", but chunk is not marked as currently transferring!"); + public synchronized void markSuccessful( FileChunk c ) + { + if ( !pendingChunks.remove( c ) ) { + LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString() + + ", but chunk is not marked as currently transferring!" ); return; } - completeChunks.add(c); + completeChunks.add( c ); + this.notifyAll(); } /** @@ -95,18 +108,21 @@ public class ChunkList { * @param c The chunk in question * @return Number of times transfer of this chunk failed */ - public synchronized int markFailed(FileChunk c) { - if (!pendingChunks.remove(c)) { - LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString() - + ", but chunk is not marked as currently transferring!"); + public synchronized int markFailed( FileChunk c ) + { + if ( !pendingChunks.remove( c ) ) { + LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString() + + ", but chunk is not marked as currently transferring!" ); return -1; } // Add as first element so it will be re-transmitted immediately - missingChunks.add(0, c); + missingChunks.add( 0, c ); + this.notifyAll(); return c.incFailed(); } - public synchronized boolean isComplete() { + public synchronized boolean isComplete() + { return missingChunks.isEmpty() && pendingChunks.isEmpty(); } diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index 3e89b84..3ec6468 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -6,17 +6,19 @@ import java.util.List; import org.openslx.filetransfer.FileRange; -public class FileChunk { +public class FileChunk +{ public static final int CHUNK_SIZE_MIB = 16; - public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * (1024 * 1024); + public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * ( 1024 * 1024 ); public final FileRange range; public final byte[] sha1sum; private int failCount = 0; - public FileChunk(long startOffset, long endOffset, byte[] sha1sum) { - this.range = new FileRange(startOffset, endOffset); + public FileChunk( long startOffset, long endOffset, byte[] sha1sum ) + { + this.range = new FileRange( startOffset, endOffset ); this.sha1sum = sha1sum; } @@ -26,52 +28,62 @@ public class FileChunk { * * @return Number of times the transfer failed now */ - public synchronized int incFailed() { + public synchronized int incFailed() + { return ++failCount; } - - public int getChunkIndex() { - return (int)(range.startOffset / CHUNK_SIZE); + + public int getChunkIndex() + { + return (int) ( range.startOffset / CHUNK_SIZE ); } - + @Override - public String toString() { + public String toString() + { return "[Chunk " + getChunkIndex() + " (" + range.startOffset + "-" + range.endOffset + "), fails: " + failCount + "]"; } // - public static int fileSizeToChunkCount(long fileSize) { - return (int) ((fileSize + CHUNK_SIZE - 1) / CHUNK_SIZE); + public static int fileSizeToChunkCount( long fileSize ) + { + return (int) ( ( fileSize + CHUNK_SIZE - 1 ) / CHUNK_SIZE ); } - public static void createChunkList(Collection list, long fileSize, List sha1Sums) { - if (fileSize < 0) - throw new IllegalArgumentException("fileSize cannot be negative"); - if (!list.isEmpty()) - throw new IllegalArgumentException("Passed list is not empty"); - long chunkCount = fileSizeToChunkCount(fileSize); - if (sha1Sums != null) { - if (sha1Sums.size() != chunkCount) + public static void createChunkList( Collection list, long fileSize, List sha1Sums ) + { + if ( fileSize < 0 ) + throw new IllegalArgumentException( "fileSize cannot be negative" ); + if ( !list.isEmpty() ) + throw new IllegalArgumentException( "Passed list is not empty" ); + long chunkCount = fileSizeToChunkCount( fileSize ); + if ( sha1Sums != null ) { + if ( sha1Sums.size() != chunkCount ) throw new IllegalArgumentException( - "Passed a sha1sum list, but hash count in list doesn't match expected chunk count"); + "Passed a sha1sum list, but hash count in list doesn't match expected chunk count" ); long offset = 0; - for (ByteBuffer sha1sum : sha1Sums) { // Do this as we don't know how efficient List.get(index) is... + for ( ByteBuffer sha1sum : sha1Sums ) { // Do this as we don't know how efficient List.get(index) is... long end = offset + CHUNK_SIZE; - if (end > fileSize) + if ( end > fileSize ) end = fileSize; - list.add(new FileChunk(offset, end, sha1sum.array())); + list.add( new FileChunk( offset, end, sha1sum.array() ) ); offset = end; } return; } long offset = 0; - while (offset < fileSize) { // ...otherwise we could share this code + while ( offset < fileSize ) { // ...otherwise we could share this code long end = offset + CHUNK_SIZE; - if (end > fileSize) + if ( end > fileSize ) end = fileSize; - list.add(new FileChunk(offset, end, null)); + list.add( new FileChunk( offset, end, null ) ); offset = end; } } + + public boolean hasSha1Sum() + { + return sha1sum != null && sha1sum.length == 20; + } } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java new file mode 100644 index 0000000..2e2d72f --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -0,0 +1,168 @@ +package org.openslx.filetransfer.util; + +import java.security.InvalidParameterException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; + +public class HashChecker +{ + private static final Logger LOGGER = Logger.getLogger( HashChecker.class ); + + private final BlockingQueue queue = new LinkedBlockingQueue<>( 10 ); + + private final List threads = new ArrayList<>(); + + private final String algorithm; + + private volatile boolean invalid = false; + + public HashChecker( String algorithm ) throws NoSuchAlgorithmException + { + this.algorithm = algorithm; + CheckThread thread = new CheckThread( false ); + thread.start(); + threads.add( thread ); + } + + private void threadFailed( CheckThread thread ) + { + synchronized ( threads ) { + if ( thread.extraThread ) + return; + invalid = true; + } + for ( ;; ) { + HashTask task = queue.poll(); + if ( task == null ) + break; + execCallback( task, HashResult.FAILURE ); + } + } + + @Override + protected void finalize() + { + try { + synchronized ( threads ) { + for ( Thread t : threads ) { + t.interrupt(); + } + } + } catch ( Throwable t ) { + LOGGER.warn( "Something threw in finalize", t ); + } + } + + private void execCallback( HashTask task, HashResult result ) + { + task.callback.hashCheckDone( result, task.data, task.chunk ); + } + + public void queue( FileChunk chunk, byte[] data, HashCheckCallback callback ) throws InterruptedException + { + if ( chunk.sha1sum == null ) + throw new NullPointerException( "Chunk has no sha1 hash" ); + if ( chunk.sha1sum.length != 20 ) + throw new InvalidParameterException( "Given chunk sha1 is not 20 bytes but " + chunk.sha1sum.length ); + HashTask task = new HashTask( data, chunk, callback ); + synchronized ( threads ) { + if ( invalid ) { + execCallback( task, HashResult.FAILURE ); + return; + } + if ( queue.remainingCapacity() <= 1 && threads.size() < Runtime.getRuntime().availableProcessors() ) { + try { + CheckThread thread = new CheckThread( true ); + thread.start(); + threads.add( thread ); + } catch ( Exception e ) { + LOGGER.warn( "Could not create additional hash checking thread", e ); + } + } + queue.put( task ); + } + } + + // ############################################################# \\ + + private class CheckThread extends Thread + { + private final MessageDigest md; + private final boolean extraThread; + + /** + * Worker thread doing the sha1 calculations and comparison + * + * @param isExtra whether this is an extra thread that should be shut down when the queue is + * empty again. + * @throws NoSuchAlgorithmException + */ + public CheckThread( boolean isExtra ) throws NoSuchAlgorithmException + { + super( "HashCheck" ); + md = MessageDigest.getInstance( algorithm ); + extraThread = isExtra; + } + + @Override + public void run() + { + while ( !interrupted() ) { + HashTask task; + // Wait for work + try { + task = queue.take(); + if ( task == null ) + continue; + } catch ( InterruptedException e ) { + LOGGER.info( "Interrupted while waiting for hash task", e ); + threadFailed( this ); + break; + } + // Calculate digest + md.update( task.data, 0, task.chunk.range.getLength() ); + byte[] digest = md.digest(); + HashResult result = Arrays.equals( digest, task.chunk.sha1sum ) ? HashResult.VALID : HashResult.INVALID; + execCallback( task, result ); + if ( extraThread && queue.isEmpty() ) { + LOGGER.info( "Stopping additional hash checker" ); + break; + } + } + } + } + + public static enum HashResult + { + VALID, // Hash matches + INVALID, // Hash does not match + FAILURE // Error calculating hash + } + + private static class HashTask + { + public final byte[] data; + public final FileChunk chunk; + public final HashCheckCallback callback; + + public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback ) + { + this.data = data; + this.chunk = chunk; + this.callback = callback; + } + } + + public static interface HashCheckCallback + { + public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk ); + } + +} -- cgit v1.2.3-55-g7522