summaryrefslogtreecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
diff options
context:
space:
mode:
Diffstat (limited to 'dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java')
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java81
1 files changed, 59 insertions, 22 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
index cf593200..8ef12e12 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
@@ -13,6 +13,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -29,8 +30,9 @@ public class AsyncHashGenerator extends Thread {
private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);
private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1,
- Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
- 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2),
+ Math.max(1, (int)Math.min(Runtime.getRuntime().availableProcessors() - 1,
+ Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3))),
+ 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1),
new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy());
private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() {
@@ -52,9 +54,13 @@ public class AsyncHashGenerator extends Thread {
private final List<FileChunk> chunkList;
private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-(
private boolean readingDone = false;
- private boolean hashingDone = false;
+ private AtomicInteger pendingHashes = new AtomicInteger();
private volatile boolean isCanceled = false;
+
+ static {
+ LOGGER.info("Using " + HASH_WORK_POOL.getMaximumPoolSize() + " hash workers.");
+ }
public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException {
try {
@@ -70,6 +76,15 @@ public class AsyncHashGenerator extends Thread {
setDaemon(true);
setName("HashGenerator");
}
+
+ @Override
+ public synchronized void start() {
+ if (isCanceled) {
+ LOGGER.warn("Cannot start hashing if it has been cancelled before");
+ } else {
+ super.start();
+ }
+ }
public void setUploadToken(String token) {
if (!isCanceled && this.uploadToken == null) {
@@ -90,14 +105,15 @@ public class AsyncHashGenerator extends Thread {
}
Block block;
try {
- byte[] buffer;
- try {
- buffer = new byte[chunk.range.getLength()];
- } catch (OutOfMemoryError e) {
- LOGGER.info("Low memory - slowing down hashing");
- Util.sleep(5000);
- continue;
- }
+ byte[] buffer = null;
+ do {
+ try {
+ buffer = new byte[chunk.range.getLength()];
+ } catch (OutOfMemoryError e) {
+ LOGGER.info("Low memory - slowing down hashing");
+ Util.sleep(5000);
+ }
+ } while (buffer == null);
file.seek(chunk.range.startOffset);
file.readFully(buffer);
block = new Block(chunk, buffer);
@@ -111,23 +127,26 @@ public class AsyncHashGenerator extends Thread {
}
//
if (System.currentTimeMillis() > nextReadMsg) {
- nextReadMsg = System.currentTimeMillis() + 30000;
+ nextReadMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Read chunk " + chunk.getChunkIndex());
}
//
for (;;) {
if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) {
LOGGER.warn("Aborting current hash job - pool has shut down");
+ isCanceled = true;
Thread.currentThread().interrupt();
return;
}
try {
+ pendingHashes.incrementAndGet();
HASH_WORK_POOL.execute(block);
// Don't hash too furiously in the background if the upload didn't start yet
if (uploadToken == null && chunk.getChunkIndex() > 4) {
Util.sleep(200);
}
} catch (RejectedExecutionException e) {
+ pendingHashes.decrementAndGet();
LOGGER.warn("Hash pool worker rejected a hash job!? Retrying...");
Util.sleep(1000);
continue;
@@ -142,7 +161,7 @@ public class AsyncHashGenerator extends Thread {
}
}
- public void cancel() {
+ public synchronized void cancel() {
LOGGER.debug("Cancelled externally");
isCanceled = true;
}
@@ -152,7 +171,7 @@ public class AsyncHashGenerator extends Thread {
*/
private class Block implements Runnable {
public final FileChunk chunk;
- public final byte[] buffer;
+ public byte[] buffer;
public Block(FileChunk chunk, byte[] buffer) {
this.chunk = chunk;
@@ -163,7 +182,14 @@ public class AsyncHashGenerator extends Thread {
public void run() {
MessageDigest digester = SHA1_DIGESTER.get();
digester.update(buffer, 0, chunk.range.getLength());
+ this.buffer = null; // Clear reference before calling function below
byte[] hash = digester.digest();
+ synchronized (this) {
+ if (isCanceled) {
+ pendingHashes.decrementAndGet();
+ return;
+ }
+ }
hashDone(chunk, hash);
}
}
@@ -180,7 +206,7 @@ public class AsyncHashGenerator extends Thread {
int chunkIndex = chunk.getChunkIndex();
boolean wasLastChunk = false;
if (System.currentTimeMillis() > nextDoneMsg) {
- nextDoneMsg = System.currentTimeMillis() + 30000;
+ nextDoneMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Done hashing chunk " + chunkIndex);
}
synchronized (chunkHashes) {
@@ -224,8 +250,7 @@ public class AsyncHashGenerator extends Thread {
isCanceled = true;
}
}
- if (wasLastChunk) {
- hashingDone = true;
+ if (pendingHashes.decrementAndGet() == 0) {
cleanupIfDone();
}
}
@@ -235,15 +260,25 @@ public class AsyncHashGenerator extends Thread {
* a reference to this class, at least we will not prevent this stuff from being
* garbage collected.
*/
- private void cleanupIfDone() {
- if (uploadToken == null && !isCanceled)
+ private synchronized void cleanupIfDone() {
+ if (!readingDone && isAlive())
return;
- if (!readingDone)
+ if (uploadToken == null && !isCanceled)
return;
- if (!hashingDone && !isCanceled)
+ if (pendingHashes.get() != 0)
return;
+ isCanceled = true;
chunkHashes.clear();
chunkList.clear();
+ LOGGER.debug("Hasher cleaned up");
+ }
+
+ /**
+ * @return true if this instance is not dong anything meaningful anymore
+ * and no reference to it needs to be kept around.
+ */
+ public boolean canBeDiscarded() {
+ return isCanceled || (!isAlive() && pendingHashes.get() == 0);
}
/**
@@ -252,6 +287,8 @@ public class AsyncHashGenerator extends Thread {
* @return false if the token is not known to the server
*/
private boolean submitHashes(boolean mustSucceed) {
+ if (isCanceled)
+ return true;
List<ByteBuffer> subList;
boolean d;
synchronized (chunkHashes) {
@@ -262,7 +299,7 @@ public class AsyncHashGenerator extends Thread {
d = System.currentTimeMillis() > nextSendingMsg;
}
if (d) {
- nextSendingMsg = System.currentTimeMillis() + 30000;
+ nextSendingMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")");
}
if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list