From 6b3d669bc55432dbc2a4d3a5b1238ad8939c3eac Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 19 Aug 2015 17:17:58 +0200 Subject: Changes to chunk list related classes, support hash handling better --- .../org/openslx/filetransfer/util/ChunkList.java | 60 +++++++++++++++---- .../org/openslx/filetransfer/util/ChunkStatus.java | 19 ++++++ .../org/openslx/filetransfer/util/FileChunk.java | 69 ++++++++++++++-------- .../org/openslx/filetransfer/util/HashChecker.java | 9 ++- 4 files changed, 118 insertions(+), 39 deletions(-) create mode 100644 src/main/java/org/openslx/filetransfer/util/ChunkStatus.java (limited to 'src/main/java/org/openslx/filetransfer') diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 88d40cf..d38c15d 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -2,6 +2,7 @@ package org.openslx.filetransfer.util; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -12,6 +13,11 @@ public class ChunkList private static final Logger LOGGER = Logger.getLogger( ChunkList.class ); + /** + * Here we keep a list of all chunks in the proper order, in case we quickly need to access one + */ + private final List allChunks; + /** * Chunks that are missing from the file */ @@ -29,10 +35,31 @@ public class ChunkList // Do we need to keep valid chunks, or chunks that failed too many times? - public ChunkList( long fileSize, List sha1Sums ) + public ChunkList( long fileSize, List sha1Sums ) { FileChunk.createChunkList( missingChunks, fileSize, sha1Sums ); statusArray = ByteBuffer.allocate( missingChunks.size() ); + allChunks = new ArrayList<>( missingChunks ); + } + + /** + * Update the sha1sums of all chunks. This is meant to be used if you passed an incomplete list + * before (with null elements), so you don't have to has hthe whole file before starting the + * upload, but periodically update it while thie upload is running. + * + * @param sha1Sums list of sums + */ + public synchronized void updateSha1Sums( List sha1Sums ) + { + int index = 0; + for ( byte[] sum : sha1Sums ) { + if ( index >= allChunks.size() ) + break; + if ( sum == null ) + continue; + allChunks.get( index ).setSha1Sum( sum ); + index++; + } } /** @@ -61,15 +88,8 @@ public class ChunkList public synchronized ByteBuffer getStatusArray() { byte[] array = statusArray.array(); - //Arrays.fill(array, (byte)0); - for ( FileChunk c : missingChunks ) { - array[c.getChunkIndex()] = 1; - } - for ( FileChunk c : pendingChunks ) { - array[c.getChunkIndex()] = 2; - } - for ( FileChunk c : completeChunks ) { - array[c.getChunkIndex()] = 0; + for ( int i = 0; i < array.length; ++i ) { + array[i] = allChunks.get( i ).getStatus().val; } return statusArray; } @@ -84,6 +104,24 @@ public class ChunkList return new ArrayList<>( completeChunks ); } + /** + * Get a chunk that is marked complete, has a sha1 hash, but has not been hash-checked yet. + * + * @return chunk + */ + public synchronized FileChunk getUnhashedComplete() + { + for ( Iterator it = completeChunks.iterator(); it.hasNext(); ) { + FileChunk chunk = it.next(); + if ( chunk.sha1sum != null && chunk.status == ChunkStatus.HASHING ) { + it.remove(); + pendingChunks.add( chunk ); + return chunk; + } + } + return null; + } + /** * Mark a chunk currently transferring as successfully transfered. * @@ -96,6 +134,7 @@ public class ChunkList + ", but chunk is not marked as currently transferring!" ); return; } + c.setStatus( ChunkStatus.COMPETE ); completeChunks.add( c ); this.notifyAll(); } @@ -116,6 +155,7 @@ public class ChunkList return -1; } // Add as first element so it will be re-transmitted immediately + c.setStatus( ChunkStatus.MISSING ); missingChunks.add( 0, c ); this.notifyAll(); return c.incFailed(); diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkStatus.java b/src/main/java/org/openslx/filetransfer/util/ChunkStatus.java new file mode 100644 index 0000000..fd54e34 --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/util/ChunkStatus.java @@ -0,0 +1,19 @@ +package org.openslx.filetransfer.util; + +public enum ChunkStatus +{ + // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing + COMPETE( 0 ), + MISSING( 1 ), + UPLOADING( 2 ), + QUEUED_FOR_COPY( 3 ), + COPYING( 4 ), + HASHING( 5 ); + + public final byte val; + + private ChunkStatus( int val ) + { + this.val = (byte)val; + } +} diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index 3ec6468..0204e00 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -1,7 +1,7 @@ package org.openslx.filetransfer.util; -import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.openslx.filetransfer.FileRange; @@ -9,17 +9,33 @@ import org.openslx.filetransfer.FileRange; public class FileChunk { + public static final int SHA1_LENGTH = 20; public static final int CHUNK_SIZE_MIB = 16; public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * ( 1024 * 1024 ); public final FileRange range; - public final byte[] sha1sum; private int failCount = 0; + protected byte[] sha1sum; + protected ChunkStatus status = ChunkStatus.MISSING; public FileChunk( long startOffset, long endOffset, byte[] sha1sum ) { this.range = new FileRange( startOffset, endOffset ); + if ( sha1sum == null || sha1sum.length != SHA1_LENGTH ) { + this.sha1sum = null; + } else { + this.sha1sum = sha1sum; + } + } + + public synchronized void setSha1Sum( byte[] sha1sum ) + { + if ( this.sha1sum != null || sha1sum == null || sha1sum.length != SHA1_LENGTH ) + return; this.sha1sum = sha1sum; + if ( this.status == ChunkStatus.COMPETE ) { + this.status = ChunkStatus.HASHING; + } } /** @@ -44,6 +60,23 @@ public class FileChunk return "[Chunk " + getChunkIndex() + " (" + range.startOffset + "-" + range.endOffset + "), fails: " + failCount + "]"; } + public synchronized byte[] getSha1Sum() + { + return sha1sum; + } + + public synchronized ChunkStatus getStatus() + { + return status; + } + + protected synchronized void setStatus( ChunkStatus status ) + { + if ( status != null ) { + this.status = status; + } + } + // public static int fileSizeToChunkCount( long fileSize ) @@ -51,39 +84,27 @@ public class FileChunk return (int) ( ( fileSize + CHUNK_SIZE - 1 ) / CHUNK_SIZE ); } - public static void createChunkList( Collection list, long fileSize, List sha1Sums ) + public static void createChunkList( Collection list, long fileSize, List sha1Sums ) { if ( fileSize < 0 ) throw new IllegalArgumentException( "fileSize cannot be negative" ); if ( !list.isEmpty() ) throw new IllegalArgumentException( "Passed list is not empty" ); - long chunkCount = fileSizeToChunkCount( fileSize ); + long offset = 0; + Iterator hashIt = null; if ( sha1Sums != null ) { - if ( sha1Sums.size() != chunkCount ) - throw new IllegalArgumentException( - "Passed a sha1sum list, but hash count in list doesn't match expected chunk count" ); - long offset = 0; - for ( ByteBuffer sha1sum : sha1Sums ) { // Do this as we don't know how efficient List.get(index) is... - long end = offset + CHUNK_SIZE; - if ( end > fileSize ) - end = fileSize; - list.add( new FileChunk( offset, end, sha1sum.array() ) ); - offset = end; - } - return; + hashIt = sha1Sums.iterator(); } - long offset = 0; - while ( offset < fileSize ) { // ...otherwise we could share this code + while ( offset < fileSize ) { long end = offset + CHUNK_SIZE; if ( end > fileSize ) end = fileSize; - list.add( new FileChunk( offset, end, null ) ); + byte[] hash = null; + if ( hashIt != null && hashIt.hasNext() ) { + hash = hashIt.next(); + } + list.add( new FileChunk( offset, end, hash ) ); offset = end; } } - - public boolean hasSha1Sum() - { - return sha1sum != null && sha1sum.length == 20; - } } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index 2e2d72f..3c173fb 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -1,6 +1,5 @@ package org.openslx.filetransfer.util; -import java.security.InvalidParameterException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -67,10 +66,9 @@ public class HashChecker public void queue( FileChunk chunk, byte[] data, HashCheckCallback callback ) throws InterruptedException { - if ( chunk.sha1sum == null ) + byte[] sha1Sum = chunk.getSha1Sum(); + if ( sha1Sum == null ) throw new NullPointerException( "Chunk has no sha1 hash" ); - if ( chunk.sha1sum.length != 20 ) - throw new InvalidParameterException( "Given chunk sha1 is not 20 bytes but " + chunk.sha1sum.length ); HashTask task = new HashTask( data, chunk, callback ); synchronized ( threads ) { if ( invalid ) { @@ -129,7 +127,7 @@ public class HashChecker // Calculate digest md.update( task.data, 0, task.chunk.range.getLength() ); byte[] digest = md.digest(); - HashResult result = Arrays.equals( digest, task.chunk.sha1sum ) ? HashResult.VALID : HashResult.INVALID; + HashResult result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; execCallback( task, result ); if ( extraThread && queue.isEmpty() ) { LOGGER.info( "Stopping additional hash checker" ); @@ -157,6 +155,7 @@ public class HashChecker this.data = data; this.chunk = chunk; this.callback = callback; + chunk.setStatus( ChunkStatus.HASHING ); } } -- cgit v1.2.3-55-g7522