summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorSimon Rettberg2015-07-23 17:42:30 +0200
committerSimon Rettberg2015-07-23 17:42:30 +0200
commitbacb060ba31fafebd8c0f15d0b6704732a37b482 (patch)
treea20be56f5d24fe8e242cd1fbb0589826bf88dc7f /src/main/java
parentVm meta parser: Add virtualizer getter, change type of description (diff)
downloadmaster-sync-shared-bacb060ba31fafebd8c0f15d0b6704732a37b482.tar.gz
master-sync-shared-bacb060ba31fafebd8c0f15d0b6704732a37b482.tar.xz
master-sync-shared-bacb060ba31fafebd8c0f15d0b6704732a37b482.zip
ChunkList.getMissing() now blocks for a while if there are still pending blocks
Added HashChecker class to verify checksums of blocks
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java70
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java66
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java168
3 files changed, 250 insertions, 54 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index 9154dc8..88d40cf 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -7,9 +7,10 @@ import java.util.List;
import org.apache.log4j.Logger;
-public class ChunkList {
+public class ChunkList
+{
- private static final Logger LOGGER = Logger.getLogger(ChunkList.class);
+ private static final Logger LOGGER = Logger.getLogger( ChunkList.class );
/**
* Chunks that are missing from the file
@@ -21,44 +22,53 @@ public class ChunkList {
*/
private final List<FileChunk> pendingChunks = new LinkedList<>();
- private final List<FileChunk> completeChunks = new ArrayList<>(100);
+ private final List<FileChunk> completeChunks = new ArrayList<>( 100 );
// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying
private final ByteBuffer statusArray;
// Do we need to keep valid chunks, or chunks that failed too many times?
- public ChunkList(long fileSize, List<ByteBuffer> sha1Sums) {
- FileChunk.createChunkList(missingChunks, fileSize, sha1Sums);
- statusArray = ByteBuffer.allocate(missingChunks.size());
+ public ChunkList( long fileSize, List<ByteBuffer> sha1Sums )
+ {
+ FileChunk.createChunkList( missingChunks, fileSize, sha1Sums );
+ statusArray = ByteBuffer.allocate( missingChunks.size() );
}
/**
* Get a missing chunk, marking it pending.
*
* @return chunk marked as missing
+ * @throws InterruptedException
*/
- public synchronized FileChunk getMissing() {
- if (missingChunks.isEmpty())
+ public synchronized FileChunk getMissing() throws InterruptedException
+ {
+ if ( missingChunks.isEmpty() && pendingChunks.isEmpty() )
return null;
- FileChunk c = missingChunks.remove(0);
- pendingChunks.add(c);
+ if ( missingChunks.isEmpty() ) {
+ this.wait( 3000 );
+ if ( missingChunks.isEmpty() )
+ return null;
+ }
+ FileChunk c = missingChunks.remove( 0 );
+ pendingChunks.add( c );
return c;
}
/**
* Get the block status as byte representation.
*/
- public synchronized ByteBuffer getStatusArray() {
+ public synchronized ByteBuffer getStatusArray()
+ {
byte[] array = statusArray.array();
//Arrays.fill(array, (byte)0);
- for (FileChunk c : missingChunks) {
+ for ( FileChunk c : missingChunks ) {
array[c.getChunkIndex()] = 1;
}
- for (FileChunk c : pendingChunks) {
+ for ( FileChunk c : pendingChunks ) {
array[c.getChunkIndex()] = 2;
}
- for (FileChunk c : completeChunks) {
+ for ( FileChunk c : completeChunks ) {
array[c.getChunkIndex()] = 0;
}
return statusArray;
@@ -69,8 +79,9 @@ public class ChunkList {
*
* @return List containing all successfully transfered chunks
*/
- public synchronized List<FileChunk> getCompleted() {
- return new ArrayList<>(completeChunks);
+ public synchronized List<FileChunk> getCompleted()
+ {
+ return new ArrayList<>( completeChunks );
}
/**
@@ -78,13 +89,15 @@ public class ChunkList {
*
* @param c The chunk in question
*/
- public synchronized void markSuccessful(FileChunk c) {
- if (!pendingChunks.remove(c)) {
- LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString()
- + ", but chunk is not marked as currently transferring!");
+ public synchronized void markSuccessful( FileChunk c )
+ {
+ if ( !pendingChunks.remove( c ) ) {
+ LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString()
+ + ", but chunk is not marked as currently transferring!" );
return;
}
- completeChunks.add(c);
+ completeChunks.add( c );
+ this.notifyAll();
}
/**
@@ -95,18 +108,21 @@ public class ChunkList {
* @param c The chunk in question
* @return Number of times transfer of this chunk failed
*/
- public synchronized int markFailed(FileChunk c) {
- if (!pendingChunks.remove(c)) {
- LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString()
- + ", but chunk is not marked as currently transferring!");
+ public synchronized int markFailed( FileChunk c )
+ {
+ if ( !pendingChunks.remove( c ) ) {
+ LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString()
+ + ", but chunk is not marked as currently transferring!" );
return -1;
}
// Add as first element so it will be re-transmitted immediately
- missingChunks.add(0, c);
+ missingChunks.add( 0, c );
+ this.notifyAll();
return c.incFailed();
}
- public synchronized boolean isComplete() {
+ public synchronized boolean isComplete()
+ {
return missingChunks.isEmpty() && pendingChunks.isEmpty();
}
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index 3e89b84..3ec6468 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -6,17 +6,19 @@ import java.util.List;
import org.openslx.filetransfer.FileRange;
-public class FileChunk {
+public class FileChunk
+{
public static final int CHUNK_SIZE_MIB = 16;
- public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * (1024 * 1024);
+ public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * ( 1024 * 1024 );
public final FileRange range;
public final byte[] sha1sum;
private int failCount = 0;
- public FileChunk(long startOffset, long endOffset, byte[] sha1sum) {
- this.range = new FileRange(startOffset, endOffset);
+ public FileChunk( long startOffset, long endOffset, byte[] sha1sum )
+ {
+ this.range = new FileRange( startOffset, endOffset );
this.sha1sum = sha1sum;
}
@@ -26,52 +28,62 @@ public class FileChunk {
*
* @return Number of times the transfer failed now
*/
- public synchronized int incFailed() {
+ public synchronized int incFailed()
+ {
return ++failCount;
}
-
- public int getChunkIndex() {
- return (int)(range.startOffset / CHUNK_SIZE);
+
+ public int getChunkIndex()
+ {
+ return (int) ( range.startOffset / CHUNK_SIZE );
}
-
+
@Override
- public String toString() {
+ public String toString()
+ {
return "[Chunk " + getChunkIndex() + " (" + range.startOffset + "-" + range.endOffset + "), fails: " + failCount + "]";
}
//
- public static int fileSizeToChunkCount(long fileSize) {
- return (int) ((fileSize + CHUNK_SIZE - 1) / CHUNK_SIZE);
+ public static int fileSizeToChunkCount( long fileSize )
+ {
+ return (int) ( ( fileSize + CHUNK_SIZE - 1 ) / CHUNK_SIZE );
}
- public static void createChunkList(Collection<FileChunk> list, long fileSize, List<ByteBuffer> sha1Sums) {
- if (fileSize < 0)
- throw new IllegalArgumentException("fileSize cannot be negative");
- if (!list.isEmpty())
- throw new IllegalArgumentException("Passed list is not empty");
- long chunkCount = fileSizeToChunkCount(fileSize);
- if (sha1Sums != null) {
- if (sha1Sums.size() != chunkCount)
+ public static void createChunkList( Collection<FileChunk> list, long fileSize, List<ByteBuffer> sha1Sums )
+ {
+ if ( fileSize < 0 )
+ throw new IllegalArgumentException( "fileSize cannot be negative" );
+ if ( !list.isEmpty() )
+ throw new IllegalArgumentException( "Passed list is not empty" );
+ long chunkCount = fileSizeToChunkCount( fileSize );
+ if ( sha1Sums != null ) {
+ if ( sha1Sums.size() != chunkCount )
throw new IllegalArgumentException(
- "Passed a sha1sum list, but hash count in list doesn't match expected chunk count");
+ "Passed a sha1sum list, but hash count in list doesn't match expected chunk count" );
long offset = 0;
- for (ByteBuffer sha1sum : sha1Sums) { // Do this as we don't know how efficient List.get(index) is...
+ for ( ByteBuffer sha1sum : sha1Sums ) { // Do this as we don't know how efficient List.get(index) is...
long end = offset + CHUNK_SIZE;
- if (end > fileSize)
+ if ( end > fileSize )
end = fileSize;
- list.add(new FileChunk(offset, end, sha1sum.array()));
+ list.add( new FileChunk( offset, end, sha1sum.array() ) );
offset = end;
}
return;
}
long offset = 0;
- while (offset < fileSize) { // ...otherwise we could share this code
+ while ( offset < fileSize ) { // ...otherwise we could share this code
long end = offset + CHUNK_SIZE;
- if (end > fileSize)
+ if ( end > fileSize )
end = fileSize;
- list.add(new FileChunk(offset, end, null));
+ list.add( new FileChunk( offset, end, null ) );
offset = end;
}
}
+
+ public boolean hasSha1Sum()
+ {
+ return sha1sum != null && sha1sum.length == 20;
+ }
}
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
new file mode 100644
index 0000000..2e2d72f
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -0,0 +1,168 @@
+package org.openslx.filetransfer.util;
+
+import java.security.InvalidParameterException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+public class HashChecker
+{
+ private static final Logger LOGGER = Logger.getLogger( HashChecker.class );
+
+ private final BlockingQueue<HashTask> queue = new LinkedBlockingQueue<>( 10 );
+
+ private final List<Thread> threads = new ArrayList<>();
+
+ private final String algorithm;
+
+ private volatile boolean invalid = false;
+
+ public HashChecker( String algorithm ) throws NoSuchAlgorithmException
+ {
+ this.algorithm = algorithm;
+ CheckThread thread = new CheckThread( false );
+ thread.start();
+ threads.add( thread );
+ }
+
+ private void threadFailed( CheckThread thread )
+ {
+ synchronized ( threads ) {
+ if ( thread.extraThread )
+ return;
+ invalid = true;
+ }
+ for ( ;; ) {
+ HashTask task = queue.poll();
+ if ( task == null )
+ break;
+ execCallback( task, HashResult.FAILURE );
+ }
+ }
+
+ @Override
+ protected void finalize()
+ {
+ try {
+ synchronized ( threads ) {
+ for ( Thread t : threads ) {
+ t.interrupt();
+ }
+ }
+ } catch ( Throwable t ) {
+ LOGGER.warn( "Something threw in finalize", t );
+ }
+ }
+
+ private void execCallback( HashTask task, HashResult result )
+ {
+ task.callback.hashCheckDone( result, task.data, task.chunk );
+ }
+
+ public void queue( FileChunk chunk, byte[] data, HashCheckCallback callback ) throws InterruptedException
+ {
+ if ( chunk.sha1sum == null )
+ throw new NullPointerException( "Chunk has no sha1 hash" );
+ if ( chunk.sha1sum.length != 20 )
+ throw new InvalidParameterException( "Given chunk sha1 is not 20 bytes but " + chunk.sha1sum.length );
+ HashTask task = new HashTask( data, chunk, callback );
+ synchronized ( threads ) {
+ if ( invalid ) {
+ execCallback( task, HashResult.FAILURE );
+ return;
+ }
+ if ( queue.remainingCapacity() <= 1 && threads.size() < Runtime.getRuntime().availableProcessors() ) {
+ try {
+ CheckThread thread = new CheckThread( true );
+ thread.start();
+ threads.add( thread );
+ } catch ( Exception e ) {
+ LOGGER.warn( "Could not create additional hash checking thread", e );
+ }
+ }
+ queue.put( task );
+ }
+ }
+
+ // ############################################################# \\
+
+ private class CheckThread extends Thread
+ {
+ private final MessageDigest md;
+ private final boolean extraThread;
+
+ /**
+ * Worker thread doing the sha1 calculations and comparison
+ *
+ * @param isExtra whether this is an extra thread that should be shut down when the queue is
+ * empty again.
+ * @throws NoSuchAlgorithmException
+ */
+ public CheckThread( boolean isExtra ) throws NoSuchAlgorithmException
+ {
+ super( "HashCheck" );
+ md = MessageDigest.getInstance( algorithm );
+ extraThread = isExtra;
+ }
+
+ @Override
+ public void run()
+ {
+ while ( !interrupted() ) {
+ HashTask task;
+ // Wait for work
+ try {
+ task = queue.take();
+ if ( task == null )
+ continue;
+ } catch ( InterruptedException e ) {
+ LOGGER.info( "Interrupted while waiting for hash task", e );
+ threadFailed( this );
+ break;
+ }
+ // Calculate digest
+ md.update( task.data, 0, task.chunk.range.getLength() );
+ byte[] digest = md.digest();
+ HashResult result = Arrays.equals( digest, task.chunk.sha1sum ) ? HashResult.VALID : HashResult.INVALID;
+ execCallback( task, result );
+ if ( extraThread && queue.isEmpty() ) {
+ LOGGER.info( "Stopping additional hash checker" );
+ break;
+ }
+ }
+ }
+ }
+
+ public static enum HashResult
+ {
+ VALID, // Hash matches
+ INVALID, // Hash does not match
+ FAILURE // Error calculating hash
+ }
+
+ private static class HashTask
+ {
+ public final byte[] data;
+ public final FileChunk chunk;
+ public final HashCheckCallback callback;
+
+ public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback )
+ {
+ this.data = data;
+ this.chunk = chunk;
+ this.callback = callback;
+ }
+ }
+
+ public static interface HashCheckCallback
+ {
+ public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk );
+ }
+
+}