summaryrefslogtreecommitdiffstats
path: root/dozentenmodul/src/main/java
diff options
context:
space:
mode:
authorSimon Rettberg2018-05-14 15:42:45 +0200
committerSimon Rettberg2018-05-14 15:42:45 +0200
commit39647236b389f311d071d7093c938163ea757dd4 (patch)
treeb4a46a780ccbc262da715a2673869fea267963fe /dozentenmodul/src/main/java
parent[client] UploadPanel: Add checkbox to toggle server side copying (diff)
downloadtutor-module-39647236b389f311d071d7093c938163ea757dd4.tar.gz
tutor-module-39647236b389f311d071d7093c938163ea757dd4.tar.xz
tutor-module-39647236b389f311d071d7093c938163ea757dd4.zip
[client] Speed up hashing
Diffstat (limited to 'dozentenmodul/src/main/java')
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java220
1 files changed, 107 insertions, 113 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
index fd488371..436982df 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
@@ -9,8 +9,10 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -18,6 +20,8 @@ import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
+import org.openslx.util.GrowingThreadPoolExecutor;
+import org.openslx.util.PrioThreadFactory;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
import org.openslx.util.Util;
@@ -26,23 +30,32 @@ public class AsyncHashGenerator extends Thread {
private static final Logger LOGGER = Logger.getLogger(AsyncHashGenerator.class);
+ private static final ThreadPoolExecutor pool = new GrowingThreadPoolExecutor(1, Runtime.getRuntime()
+ .availableProcessors(), 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(4),
+ new PrioThreadFactory("HashGen"));
+
+ private static final ThreadLocal<MessageDigest> sha1 = new ThreadLocal<MessageDigest>() {
+ @Override
+ protected MessageDigest initialValue() {
+ try {
+ return MessageDigest.getInstance("SHA-1");
+ } catch (NoSuchAlgorithmException e) {
+ LOGGER.warn("No SHA-1 MD available. Cannot hash file", e);
+ return null;
+ }
+ }
+ };
+
private String uploadToken = null;
+ private int finishedBlocks = 0;
private final RandomAccessFile file;
private final List<ByteBuffer> blockHashes;
- private final MessageDigest digester;
private final List<FileChunk> list;
- private final AsyncHashGenerator thisGenerator = this;
private volatile boolean isCanceled = false;
public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException {
try {
- digester = MessageDigest.getInstance("SHA-1");
- } catch (NoSuchAlgorithmException e1) {
- LOGGER.warn("No SHA-1 MD available. Cannot hash file", e1);
- throw e1;
- }
- try {
file = new RandomAccessFile(uploadFile, "r");
} catch (FileNotFoundException e) {
LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e);
@@ -64,8 +77,6 @@ public class AsyncHashGenerator extends Thread {
@Override
public void run() {
- Checker checker = new Checker();
- checker.start();
try {
for (FileChunk chunk : list) {
if (isCanceled) {
@@ -80,33 +91,30 @@ public class AsyncHashGenerator extends Thread {
block = new Block(chunk, buffer);
} catch (IOException e) {
LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e);
- block = new Block(chunk, null);
+ block = new Block(chunk, new byte[0]);
}
if (isCanceled) {
LOGGER.debug("Cancelled chunk reader (2)");
break;
}
- try {
- checker.put(block);
- // Don't hash too furiously in the background if the upload didn't start yet
- if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) {
- Util.sleep(200);
+ for (;;) {
+ if (pool.isTerminating() || pool.isTerminated()) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ try {
+ pool.execute(block);
+ // Don't hash too furiously in the background if the upload didn't start yet
+ if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) {
+ Util.sleep(200);
+ }
+ } catch (RejectedExecutionException e) {
+ Util.sleep(100);
+ continue;
}
- } catch (InterruptedException e) {
- LOGGER.info("Reader thread for hash checking got interrupted", e);
- interrupt();
break;
}
}
- try {
- if (interrupted() || isCanceled) {
- LOGGER.debug("Reader: Interrupting hasher");
- checker.interrupt();
- }
- checker.join();
- } catch (InterruptedException e) {
- //
- }
} finally {
Util.safeClose(file);
}
@@ -117,7 +125,7 @@ public class AsyncHashGenerator extends Thread {
isCanceled = true;
}
- private static class Block {
+ private class Block implements Runnable {
public final FileChunk chunk;
public final byte[] buffer;
@@ -125,87 +133,72 @@ public class AsyncHashGenerator extends Thread {
this.chunk = chunk;
this.buffer = buffer;
}
- }
-
- private class Checker extends Thread {
- private final BlockingQueue<Block> queue = new LinkedBlockingQueue<>(5);
-
- public Checker() {
- setName("Hasher");
- setDaemon(true);
+ @Override
+ public void run() {
+ MessageDigest digester = sha1.get();
+ digester.update(buffer, 0, chunk.range.getLength());
+ byte[] hash = digester.digest();
+ hashDone(chunk, hash);
}
+ }
- public void put(Block block) throws InterruptedException {
- if (!isAlive()) {
- Thread.currentThread().interrupt();
- return;
+ /**
+ * Called by worker thread when a block has been hashed.
+ * This means this method is not running in the currentAsyncHashGenerator
+ * thread but one of the workers.
+ *
+ * @param chunk
+ * @param hash
+ */
+ private void hashDone(FileChunk chunk, byte[] hash) {
+ int blockIndex = chunk.getChunkIndex();
+ synchronized (blockHashes) {
+ while (blockHashes.size() < blockIndex) {
+ blockHashes.add(null);
+ }
+ if (blockHashes.size() == blockIndex) {
+ blockHashes.add(ByteBuffer.wrap(hash));
+ } else {
+ blockHashes.set(blockIndex, ByteBuffer.wrap(hash));
}
- if (!queue.offer(block)) {
- LOGGER.debug("Putting read block into hasher queue is blocking!");
- queue.put(block);
- LOGGER.debug("Put succeeded");
+ if (blockIndex == finishedBlocks) {
+ while (finishedBlocks < blockHashes.size() && blockHashes.get(finishedBlocks) != null) {
+ finishedBlocks++;
+ }
}
}
-
- @Override
- public void run() {
- try {
- while (!interrupted() && !isCanceled) {
- Block block = queue.take();
- if (isCanceled) {
- LOGGER.debug("Hashing thread was cancelled");
+ if (blockIndex % 20 == 0 || finishedBlocks == list.size()) {
+ if (blockIndex + 1 == list.size()) {
+ // Last block was hashed - make sure list gets to the server
+ LOGGER.debug("Hashing done");
+ for (int i = 0; i < 10; ++i) {
+ if (submitHashes(true)) {
+ LOGGER.debug("Hashes sent to server");
break;
}
- final byte[] hash;
- if (block.buffer == null) {
- hash = new byte[0];
- } else {
- digester.update(block.buffer, 0, block.chunk.range.getLength());
- hash = digester.digest();
+ LOGGER.debug("Sending hashes failed...");
+ try {
+ Thread.sleep(2000);
+ continue;
+ } catch (InterruptedException e) {
+ interrupt();
+ return;
}
- int blockIndex = block.chunk.getChunkIndex();
- if (blockHashes.size() != blockIndex) {
- LOGGER.warn("Inconsistent state: blockHashes.size() != currentBlockIndex");
+ }
+ return;
+ }
+ // Mid-hashing - update server side
+ QuickTimer.scheduleOnce(new Task() {
+ @Override
+ public void fire() {
+ if (!submitHashes(false)) {
+ LOGGER.warn("Server rejected block hash list");
isCanceled = true;
- break;
- }
- synchronized (blockHashes) {
- blockHashes.add(ByteBuffer.wrap(hash));
- }
- if (blockIndex % 20 == 0 || blockIndex + 1 == list.size()) {
- if (blockIndex + 1 == list.size()) {
- // Last block was hashed - make sure list gets to the server
- LOGGER.debug("Hashing done");
- for (int i = 0; i < 10; ++i) {
- if (submitHashes(true)) {
- LOGGER.debug("Hashes sent to server");
- break;
- }
- LOGGER.debug("Sending hashes failed...");
- Thread.sleep(2000);
- }
- break;
- }
- // Mid-hashing - update server side
- QuickTimer.scheduleOnce(new Task() {
- @Override
- public void fire() {
- if (!submitHashes(false)) {
- LOGGER.warn("Server rejected block hash list");
- isCanceled = true;
- }
- }
- });
}
}
- } catch (InterruptedException e) {
- LOGGER.debug("Hash checker thread got interrupted.");
- interrupt();
- } finally {
- thisGenerator.interrupt();
- }
- }
+ });
+ }
}
/**
@@ -214,23 +207,24 @@ public class AsyncHashGenerator extends Thread {
* @return false if the token is not known to the server
*/
private boolean submitHashes(boolean mustSucceed) {
+ List<ByteBuffer> subList;
synchronized (blockHashes) {
- if (uploadToken == null || blockHashes.isEmpty()) // No token yet, cannot submit, or empty list
- return true;
- try {
- ThriftManager.getSatClient().updateBlockHashes(uploadToken, blockHashes,
- Session.getSatelliteToken());
- } catch (TInvalidTokenException e) {
- LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!");
- isCanceled = true;
- return false;
- } catch (TException e) {
- LOGGER.warn("Unknown exception when submitting hashList to sat", e);
- if (mustSucceed)
- return false;
- }
+ subList = blockHashes.subList(0, finishedBlocks);
+ }
+ if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list
return true;
+ try {
+ ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken());
+ } catch (TInvalidTokenException e) {
+ LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!");
+ isCanceled = true;
+ return false;
+ } catch (TException e) {
+ LOGGER.warn("Unknown exception when submitting hashList to sat", e);
+ if (mustSucceed)
+ return false;
}
+ return true;
}
}