diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/HashChecker.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/util/HashChecker.java | 45 |
1 files changed, 33 insertions, 12 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index 273bc7e..2c404db 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -7,6 +7,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -24,7 +25,9 @@ public class HashChecker private final String algorithm; - private volatile boolean invalid = false; + private boolean invalid = false; + + private final int queueCapacity; public HashChecker( String algorithm ) throws NoSuchAlgorithmException { @@ -34,6 +37,7 @@ public class HashChecker public HashChecker( String algorithm, int queueLen ) throws NoSuchAlgorithmException { this.algorithm = algorithm; + this.queueCapacity = queueLen; this.queue = new LinkedBlockingQueue<>( queueLen ); CheckThread thread = new CheckThread( false ); thread.start(); @@ -44,6 +48,7 @@ public class HashChecker { synchronized ( threads ) { threads.remove( thread ); + LOGGER.debug( "Check threads: " + threads.size() ); if ( thread.extraThread ) return; invalid = true; @@ -109,6 +114,7 @@ public class HashChecker CheckThread thread = new CheckThread( true ); thread.start(); threads.add( thread ); + LOGGER.debug( "Check threads: " + threads.size() ); } catch ( Exception e ) { LOGGER.warn( "Could not create additional hash checking thread", e ); } @@ -127,6 +133,19 @@ public class HashChecker return true; } + /** + * Get number of chunks currently waiting for a worker thread. + */ + public int getQueueFill() + { + return queue.size(); + } + + public int getQueueCapacity() + { + return queueCapacity; + } + // ############################################################# \\ private class CheckThread extends Thread @@ -156,16 +175,23 @@ public class HashChecker HashTask task; // Wait for work try { - task = queue.take(); - if ( task == null ) - continue; + if ( extraThread ) { + task = queue.poll( 30, TimeUnit.SECONDS ); + if ( task == null ) { + break; + } + } else { + task = queue.take(); + if ( task == null ) + continue; + } } catch ( InterruptedException e ) { LOGGER.info( "Interrupted while waiting for hash task", e ); break; } HashResult result = HashResult.NONE; if ( task.doHash ) { - // Calculate digest + // Calculate digest md.update( task.data, 0, task.chunk.range.getLength() ); byte[] digest = md.digest(); result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; @@ -175,14 +201,9 @@ public class HashChecker task.chunk.calculateDnbd3Crc32( task.data ); } execCallback( task, result ); - if ( extraThread && queue.isEmpty() ) { - break; - } } - if ( extraThread ) { - LOGGER.info( "Stopped additional hash checker" ); - } else { - LOGGER.info( "Stopped MAIN hash checker" ); + if ( !extraThread ) { + LOGGER.warn( "Stopped MAIN hash checker" ); } threadFailed( this ); } |