diff options
Diffstat (limited to 'dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java')
-rw-r--r-- | dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java | 81 |
1 files changed, 59 insertions, 22 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java index cf593200..8ef12e12 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java @@ -13,6 +13,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,8 +30,9 @@ public class AsyncHashGenerator extends Thread { private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class); private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1, - Math.max(1, Runtime.getRuntime().availableProcessors() - 1), - 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2), + Math.max(1, (int)Math.min(Runtime.getRuntime().availableProcessors() - 1, + Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3))), + 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy()); private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() { @@ -52,9 +54,13 @@ public class AsyncHashGenerator extends Thread { private final List<FileChunk> chunkList; private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-( private boolean readingDone = false; - private boolean hashingDone = false; + private AtomicInteger pendingHashes = new AtomicInteger(); private volatile boolean isCanceled = false; + + static { + LOGGER.info("Using " + HASH_WORK_POOL.getMaximumPoolSize() + " hash workers."); + } public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException { try { @@ -70,6 +76,15 @@ public class AsyncHashGenerator extends Thread { setDaemon(true); setName("HashGenerator"); } + + @Override + public synchronized void start() { + if (isCanceled) { + LOGGER.warn("Cannot start hashing if it has been cancelled before"); + } else { + super.start(); + } + } public void setUploadToken(String token) { if (!isCanceled && this.uploadToken == null) { @@ -90,14 +105,15 @@ public class AsyncHashGenerator extends Thread { } Block block; try { - byte[] buffer; - try { - buffer = new byte[chunk.range.getLength()]; - } catch (OutOfMemoryError e) { - LOGGER.info("Low memory - slowing down hashing"); - Util.sleep(5000); - continue; - } + byte[] buffer = null; + do { + try { + buffer = new byte[chunk.range.getLength()]; + } catch (OutOfMemoryError e) { + LOGGER.info("Low memory - slowing down hashing"); + Util.sleep(5000); + } + } while (buffer == null); file.seek(chunk.range.startOffset); file.readFully(buffer); block = new Block(chunk, buffer); @@ -111,23 +127,26 @@ public class AsyncHashGenerator extends Thread { } // if (System.currentTimeMillis() > nextReadMsg) { - nextReadMsg = System.currentTimeMillis() + 30000; + nextReadMsg = System.currentTimeMillis() + 60000; LOGGER.debug("Read chunk " + chunk.getChunkIndex()); } // for (;;) { if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) { LOGGER.warn("Aborting current hash job - pool has shut down"); + isCanceled = true; Thread.currentThread().interrupt(); return; } try { + pendingHashes.incrementAndGet(); HASH_WORK_POOL.execute(block); // Don't hash too furiously in the background if the upload didn't start yet if (uploadToken == null && chunk.getChunkIndex() > 4) { Util.sleep(200); } } catch (RejectedExecutionException e) { + pendingHashes.decrementAndGet(); LOGGER.warn("Hash pool worker rejected a hash job!? Retrying..."); Util.sleep(1000); continue; @@ -142,7 +161,7 @@ public class AsyncHashGenerator extends Thread { } } - public void cancel() { + public synchronized void cancel() { LOGGER.debug("Cancelled externally"); isCanceled = true; } @@ -152,7 +171,7 @@ public class AsyncHashGenerator extends Thread { */ private class Block implements Runnable { public final FileChunk chunk; - public final byte[] buffer; + public byte[] buffer; public Block(FileChunk chunk, byte[] buffer) { this.chunk = chunk; @@ -163,7 +182,14 @@ public class AsyncHashGenerator extends Thread { public void run() { MessageDigest digester = SHA1_DIGESTER.get(); digester.update(buffer, 0, chunk.range.getLength()); + this.buffer = null; // Clear reference before calling function below byte[] hash = digester.digest(); + synchronized (this) { + if (isCanceled) { + pendingHashes.decrementAndGet(); + return; + } + } hashDone(chunk, hash); } } @@ -180,7 +206,7 @@ public class AsyncHashGenerator extends Thread { int chunkIndex = chunk.getChunkIndex(); boolean wasLastChunk = false; if (System.currentTimeMillis() > nextDoneMsg) { - nextDoneMsg = System.currentTimeMillis() + 30000; + nextDoneMsg = System.currentTimeMillis() + 60000; LOGGER.debug("Done hashing chunk " + chunkIndex); } synchronized (chunkHashes) { @@ -224,8 +250,7 @@ public class AsyncHashGenerator extends Thread { isCanceled = true; } } - if (wasLastChunk) { - hashingDone = true; + if (pendingHashes.decrementAndGet() == 0) { cleanupIfDone(); } } @@ -235,15 +260,25 @@ public class AsyncHashGenerator extends Thread { * a reference to this class, at least we will not prevent this stuff from being * garbage collected. */ - private void cleanupIfDone() { - if (uploadToken == null && !isCanceled) + private synchronized void cleanupIfDone() { + if (!readingDone && isAlive()) return; - if (!readingDone) + if (uploadToken == null && !isCanceled) return; - if (!hashingDone && !isCanceled) + if (pendingHashes.get() != 0) return; + isCanceled = true; chunkHashes.clear(); chunkList.clear(); + LOGGER.debug("Hasher cleaned up"); + } + + /** + * @return true if this instance is not dong anything meaningful anymore + * and no reference to it needs to be kept around. + */ + public boolean canBeDiscarded() { + return isCanceled || (!isAlive() && pendingHashes.get() == 0); } /** @@ -252,6 +287,8 @@ public class AsyncHashGenerator extends Thread { * @return false if the token is not known to the server */ private boolean submitHashes(boolean mustSucceed) { + if (isCanceled) + return true; List<ByteBuffer> subList; boolean d; synchronized (chunkHashes) { @@ -262,7 +299,7 @@ public class AsyncHashGenerator extends Thread { d = System.currentTimeMillis() > nextSendingMsg; } if (d) { - nextSendingMsg = System.currentTimeMillis() + 30000; + nextSendingMsg = System.currentTimeMillis() + 60000; LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")"); } if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list |