package org.openslx.filetransfer.util; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; public class HashChecker { private static final Logger LOGGER = Logger.getLogger( HashChecker.class ); private final BlockingQueue queue; private final List threads = new ArrayList<>(); private final String algorithm; private volatile boolean invalid = false; public HashChecker( String algorithm ) throws NoSuchAlgorithmException { this( algorithm, 10 ); } public HashChecker( String algorithm, int queueLen ) throws NoSuchAlgorithmException { this.algorithm = algorithm; this.queue = new LinkedBlockingQueue<>( queueLen ); CheckThread thread = new CheckThread( false ); thread.start(); threads.add( thread ); } private void threadFailed( CheckThread thread ) { synchronized ( threads ) { threads.remove( thread ); if ( thread.extraThread ) return; invalid = true; } LOGGER.debug( "Marking all queued chunks as failed" ); for ( ;; ) { HashTask task = queue.poll(); if ( task == null ) break; execCallback( task, HashResult.FAILURE ); } } @Override protected void finalize() { try { synchronized ( threads ) { for ( Thread t : threads ) { t.interrupt(); } } } catch ( Throwable t ) { LOGGER.warn( "Something threw in finalize", t ); } } private void execCallback( HashTask task, HashResult result ) { try { task.callback.hashCheckDone( result, task.data, task.chunk ); } catch ( Throwable t ) { LOGGER.warn( "HashCheck callback threw!", t ); } } /** * Queue the given chunk for hashing. The chunk should be in pending state. * * @param chunk chunk to hash * @param data binary data of this chunk * @param callback callback to call when hashing is done * @return true if the chunk was handled, false if the queue was full and rejected the chunk. * @throws InterruptedException */ public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, boolean blocking ) throws InterruptedException { byte[] sha1Sum = chunk.getSha1Sum(); if ( sha1Sum == null ) throw new NullPointerException( "Chunk has no sha1 hash" ); HashTask task = new HashTask( data, chunk, callback ); synchronized ( threads ) { if ( invalid ) { execCallback( task, HashResult.FAILURE ); return true; } if ( queue.remainingCapacity() <= 1 && threads.size() < Runtime.getRuntime().availableProcessors() ) { try { CheckThread thread = new CheckThread( true ); thread.start(); threads.add( thread ); } catch ( Exception e ) { LOGGER.warn( "Could not create additional hash checking thread", e ); } } } chunk.setStatus( ChunkStatus.HASHING ); if ( blocking ) { queue.put( task ); } else { if ( !queue.offer( task ) ) { return false; } } return true; } // ############################################################# \\ private class CheckThread extends Thread { private final MessageDigest md; private final boolean extraThread; /** * Worker thread doing the sha1 calculations and comparison * * @param isExtra whether this is an extra thread that should be shut down when the queue is * empty again. * @throws NoSuchAlgorithmException */ public CheckThread( boolean isExtra ) throws NoSuchAlgorithmException { super( "HashCheck" ); md = MessageDigest.getInstance( algorithm ); extraThread = isExtra; setPriority( Thread.NORM_PRIORITY - 1 ); } @Override public void run() { while ( !interrupted() ) { HashTask task; // Wait for work try { task = queue.take(); if ( task == null ) continue; } catch ( InterruptedException e ) { LOGGER.info( "Interrupted while waiting for hash task", e ); break; } // Calculate digest md.update( task.data, 0, task.chunk.range.getLength() ); byte[] digest = md.digest(); HashResult result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; execCallback( task, result ); if ( extraThread && queue.isEmpty() ) { break; } } if ( extraThread ) { LOGGER.info( "Stopped additional hash checker" ); } else { LOGGER.info( "Stopped MAIN hash checker" ); } threadFailed( this ); } } public static enum HashResult { VALID, // Hash matches INVALID, // Hash does not match FAILURE // Error calculating hash } private static class HashTask { public final byte[] data; public final FileChunk chunk; public final HashCheckCallback callback; public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback ) { this.data = data; this.chunk = chunk; this.callback = callback; } } public static interface HashCheckCallback { public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk ); } }