summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/HashChecker.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/HashChecker.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java45
1 files changed, 33 insertions, 12 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index 273bc7e..2c404db 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -7,6 +7,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -24,7 +25,9 @@ public class HashChecker
private final String algorithm;
- private volatile boolean invalid = false;
+ private boolean invalid = false;
+
+ private final int queueCapacity;
public HashChecker( String algorithm ) throws NoSuchAlgorithmException
{
@@ -34,6 +37,7 @@ public class HashChecker
public HashChecker( String algorithm, int queueLen ) throws NoSuchAlgorithmException
{
this.algorithm = algorithm;
+ this.queueCapacity = queueLen;
this.queue = new LinkedBlockingQueue<>( queueLen );
CheckThread thread = new CheckThread( false );
thread.start();
@@ -44,6 +48,7 @@ public class HashChecker
{
synchronized ( threads ) {
threads.remove( thread );
+ LOGGER.debug( "Check threads: " + threads.size() );
if ( thread.extraThread )
return;
invalid = true;
@@ -109,6 +114,7 @@ public class HashChecker
CheckThread thread = new CheckThread( true );
thread.start();
threads.add( thread );
+ LOGGER.debug( "Check threads: " + threads.size() );
} catch ( Exception e ) {
LOGGER.warn( "Could not create additional hash checking thread", e );
}
@@ -127,6 +133,19 @@ public class HashChecker
return true;
}
+ /**
+ * Get number of chunks currently waiting for a worker thread.
+ */
+ public int getQueueFill()
+ {
+ return queue.size();
+ }
+
+ public int getQueueCapacity()
+ {
+ return queueCapacity;
+ }
+
// ############################################################# \\
private class CheckThread extends Thread
@@ -156,16 +175,23 @@ public class HashChecker
HashTask task;
// Wait for work
try {
- task = queue.take();
- if ( task == null )
- continue;
+ if ( extraThread ) {
+ task = queue.poll( 30, TimeUnit.SECONDS );
+ if ( task == null ) {
+ break;
+ }
+ } else {
+ task = queue.take();
+ if ( task == null )
+ continue;
+ }
} catch ( InterruptedException e ) {
LOGGER.info( "Interrupted while waiting for hash task", e );
break;
}
HashResult result = HashResult.NONE;
if ( task.doHash ) {
- // Calculate digest
+ // Calculate digest
md.update( task.data, 0, task.chunk.range.getLength() );
byte[] digest = md.digest();
result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
@@ -175,14 +201,9 @@ public class HashChecker
task.chunk.calculateDnbd3Crc32( task.data );
}
execCallback( task, result );
- if ( extraThread && queue.isEmpty() ) {
- break;
- }
}
- if ( extraThread ) {
- LOGGER.info( "Stopped additional hash checker" );
- } else {
- LOGGER.info( "Stopped MAIN hash checker" );
+ if ( !extraThread ) {
+ LOGGER.warn( "Stopped MAIN hash checker" );
}
threadFailed( this );
}