diff options
author | Simon Rettberg | 2018-05-14 15:42:45 +0200 |
---|---|---|
committer | Simon Rettberg | 2018-05-14 15:42:45 +0200 |
commit | 39647236b389f311d071d7093c938163ea757dd4 (patch) | |
tree | b4a46a780ccbc262da715a2673869fea267963fe | |
parent | [client] UploadPanel: Add checkbox to toggle server side copying (diff) | |
download | tutor-module-39647236b389f311d071d7093c938163ea757dd4.tar.gz tutor-module-39647236b389f311d071d7093c938163ea757dd4.tar.xz tutor-module-39647236b389f311d071d7093c938163ea757dd4.zip |
[client] Speed up hashing
-rw-r--r-- | dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java | 220 | ||||
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/PrioThreadFactory.java | 24 |
2 files changed, 107 insertions, 137 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java index fd488371..436982df 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java @@ -9,8 +9,10 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -18,6 +20,8 @@ import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.dozmod.thrift.Session; import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.PrioThreadFactory; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; import org.openslx.util.Util; @@ -26,23 +30,32 @@ public class AsyncHashGenerator extends Thread { private static final Logger LOGGER = Logger.getLogger(AsyncHashGenerator.class); + private static final ThreadPoolExecutor pool = new GrowingThreadPoolExecutor(1, Runtime.getRuntime() + .availableProcessors(), 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(4), + new PrioThreadFactory("HashGen")); + + private static final ThreadLocal<MessageDigest> sha1 = new ThreadLocal<MessageDigest>() { + @Override + protected MessageDigest initialValue() { + try { + return MessageDigest.getInstance("SHA-1"); + } catch (NoSuchAlgorithmException e) { + LOGGER.warn("No SHA-1 MD available. Cannot hash file", e); + return null; + } + } + }; + private String uploadToken = null; + private int finishedBlocks = 0; private final RandomAccessFile file; private final List<ByteBuffer> blockHashes; - private final MessageDigest digester; private final List<FileChunk> list; - private final AsyncHashGenerator thisGenerator = this; private volatile boolean isCanceled = false; public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException { try { - digester = MessageDigest.getInstance("SHA-1"); - } catch (NoSuchAlgorithmException e1) { - LOGGER.warn("No SHA-1 MD available. Cannot hash file", e1); - throw e1; - } - try { file = new RandomAccessFile(uploadFile, "r"); } catch (FileNotFoundException e) { LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e); @@ -64,8 +77,6 @@ public class AsyncHashGenerator extends Thread { @Override public void run() { - Checker checker = new Checker(); - checker.start(); try { for (FileChunk chunk : list) { if (isCanceled) { @@ -80,33 +91,30 @@ public class AsyncHashGenerator extends Thread { block = new Block(chunk, buffer); } catch (IOException e) { LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e); - block = new Block(chunk, null); + block = new Block(chunk, new byte[0]); } if (isCanceled) { LOGGER.debug("Cancelled chunk reader (2)"); break; } - try { - checker.put(block); - // Don't hash too furiously in the background if the upload didn't start yet - if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) { - Util.sleep(200); + for (;;) { + if (pool.isTerminating() || pool.isTerminated()) { + Thread.currentThread().interrupt(); + return; + } + try { + pool.execute(block); + // Don't hash too furiously in the background if the upload didn't start yet + if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) { + Util.sleep(200); + } + } catch (RejectedExecutionException e) { + Util.sleep(100); + continue; } - } catch (InterruptedException e) { - LOGGER.info("Reader thread for hash checking got interrupted", e); - interrupt(); break; } } - try { - if (interrupted() || isCanceled) { - LOGGER.debug("Reader: Interrupting hasher"); - checker.interrupt(); - } - checker.join(); - } catch (InterruptedException e) { - // - } } finally { Util.safeClose(file); } @@ -117,7 +125,7 @@ public class AsyncHashGenerator extends Thread { isCanceled = true; } - private static class Block { + private class Block implements Runnable { public final FileChunk chunk; public final byte[] buffer; @@ -125,87 +133,72 @@ public class AsyncHashGenerator extends Thread { this.chunk = chunk; this.buffer = buffer; } - } - - private class Checker extends Thread { - private final BlockingQueue<Block> queue = new LinkedBlockingQueue<>(5); - - public Checker() { - setName("Hasher"); - setDaemon(true); + @Override + public void run() { + MessageDigest digester = sha1.get(); + digester.update(buffer, 0, chunk.range.getLength()); + byte[] hash = digester.digest(); + hashDone(chunk, hash); } + } - public void put(Block block) throws InterruptedException { - if (!isAlive()) { - Thread.currentThread().interrupt(); - return; + /** + * Called by worker thread when a block has been hashed. + * This means this method is not running in the currentAsyncHashGenerator + * thread but one of the workers. + * + * @param chunk + * @param hash + */ + private void hashDone(FileChunk chunk, byte[] hash) { + int blockIndex = chunk.getChunkIndex(); + synchronized (blockHashes) { + while (blockHashes.size() < blockIndex) { + blockHashes.add(null); + } + if (blockHashes.size() == blockIndex) { + blockHashes.add(ByteBuffer.wrap(hash)); + } else { + blockHashes.set(blockIndex, ByteBuffer.wrap(hash)); } - if (!queue.offer(block)) { - LOGGER.debug("Putting read block into hasher queue is blocking!"); - queue.put(block); - LOGGER.debug("Put succeeded"); + if (blockIndex == finishedBlocks) { + while (finishedBlocks < blockHashes.size() && blockHashes.get(finishedBlocks) != null) { + finishedBlocks++; + } } } - - @Override - public void run() { - try { - while (!interrupted() && !isCanceled) { - Block block = queue.take(); - if (isCanceled) { - LOGGER.debug("Hashing thread was cancelled"); + if (blockIndex % 20 == 0 || finishedBlocks == list.size()) { + if (blockIndex + 1 == list.size()) { + // Last block was hashed - make sure list gets to the server + LOGGER.debug("Hashing done"); + for (int i = 0; i < 10; ++i) { + if (submitHashes(true)) { + LOGGER.debug("Hashes sent to server"); break; } - final byte[] hash; - if (block.buffer == null) { - hash = new byte[0]; - } else { - digester.update(block.buffer, 0, block.chunk.range.getLength()); - hash = digester.digest(); + LOGGER.debug("Sending hashes failed..."); + try { + Thread.sleep(2000); + continue; + } catch (InterruptedException e) { + interrupt(); + return; } - int blockIndex = block.chunk.getChunkIndex(); - if (blockHashes.size() != blockIndex) { - LOGGER.warn("Inconsistent state: blockHashes.size() != currentBlockIndex"); + } + return; + } + // Mid-hashing - update server side + QuickTimer.scheduleOnce(new Task() { + @Override + public void fire() { + if (!submitHashes(false)) { + LOGGER.warn("Server rejected block hash list"); isCanceled = true; - break; - } - synchronized (blockHashes) { - blockHashes.add(ByteBuffer.wrap(hash)); - } - if (blockIndex % 20 == 0 || blockIndex + 1 == list.size()) { - if (blockIndex + 1 == list.size()) { - // Last block was hashed - make sure list gets to the server - LOGGER.debug("Hashing done"); - for (int i = 0; i < 10; ++i) { - if (submitHashes(true)) { - LOGGER.debug("Hashes sent to server"); - break; - } - LOGGER.debug("Sending hashes failed..."); - Thread.sleep(2000); - } - break; - } - // Mid-hashing - update server side - QuickTimer.scheduleOnce(new Task() { - @Override - public void fire() { - if (!submitHashes(false)) { - LOGGER.warn("Server rejected block hash list"); - isCanceled = true; - } - } - }); } } - } catch (InterruptedException e) { - LOGGER.debug("Hash checker thread got interrupted."); - interrupt(); - } finally { - thisGenerator.interrupt(); - } - } + }); + } } /** @@ -214,23 +207,24 @@ public class AsyncHashGenerator extends Thread { * @return false if the token is not known to the server */ private boolean submitHashes(boolean mustSucceed) { + List<ByteBuffer> subList; synchronized (blockHashes) { - if (uploadToken == null || blockHashes.isEmpty()) // No token yet, cannot submit, or empty list - return true; - try { - ThriftManager.getSatClient().updateBlockHashes(uploadToken, blockHashes, - Session.getSatelliteToken()); - } catch (TInvalidTokenException e) { - LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!"); - isCanceled = true; - return false; - } catch (TException e) { - LOGGER.warn("Unknown exception when submitting hashList to sat", e); - if (mustSucceed) - return false; - } + subList = blockHashes.subList(0, finishedBlocks); + } + if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list return true; + try { + ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken()); + } catch (TInvalidTokenException e) { + LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!"); + isCanceled = true; + return false; + } catch (TException e) { + LOGGER.warn("Unknown exception when submitting hashList to sat", e); + if (mustSucceed) + return false; } + return true; } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/PrioThreadFactory.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/PrioThreadFactory.java deleted file mode 100644 index 1cb2e196..00000000 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/PrioThreadFactory.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.openslx.bwlp.sat.util; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -public class PrioThreadFactory implements ThreadFactory { - - private final AtomicInteger counter = new AtomicInteger(); - private final String name; - private final int priority; - - public PrioThreadFactory(String name, int priority) { - this.name = name; - this.priority = priority; - } - - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, name + "-" + counter.incrementAndGet()); - thread.setPriority(priority); - return thread; - } - -} |