summaryrefslogtreecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
diff options
context:
space:
mode:
authorSimon Rettberg2022-03-21 17:22:16 +0100
committerSimon Rettberg2022-03-21 17:22:16 +0100
commitad0788e8fbead90d1ab03ba1a5c83b00114cb3a0 (patch)
treeaf62b617fb6149afce16d417474abd49f373cbf9 /dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
parent[client] Cleanup chunk data lists when upload finished or is cancelled (diff)
downloadtutor-module-ad0788e8fbead90d1ab03ba1a5c83b00114cb3a0.tar.gz
tutor-module-ad0788e8fbead90d1ab03ba1a5c83b00114cb3a0.tar.xz
tutor-module-ad0788e8fbead90d1ab03ba1a5c83b00114cb3a0.zip
[client] Memory management; handle OOM when hashing, do not skip blocks
Try to free some references regarding transfers earlier, e.g. the hash worker and list of hashes as soon as hashing is finished on upload, not only when the upload is finished and the window is closed. Properly delay hashing of blocks in OOM scenarios, and be more conservative with the number of hash workers, i.e. take maximum JVM memory into account. Also, improve thread naming.
Diffstat (limited to 'dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java')
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java81
1 files changed, 59 insertions, 22 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
index cf593200..8ef12e12 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
@@ -13,6 +13,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -29,8 +30,9 @@ public class AsyncHashGenerator extends Thread {
private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);
private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1,
- Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
- 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2),
+ Math.max(1, (int)Math.min(Runtime.getRuntime().availableProcessors() - 1,
+ Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3))),
+ 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1),
new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy());
private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() {
@@ -52,9 +54,13 @@ public class AsyncHashGenerator extends Thread {
private final List<FileChunk> chunkList;
private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-(
private boolean readingDone = false;
- private boolean hashingDone = false;
+ private AtomicInteger pendingHashes = new AtomicInteger();
private volatile boolean isCanceled = false;
+
+ static {
+ LOGGER.info("Using " + HASH_WORK_POOL.getMaximumPoolSize() + " hash workers.");
+ }
public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException {
try {
@@ -70,6 +76,15 @@ public class AsyncHashGenerator extends Thread {
setDaemon(true);
setName("HashGenerator");
}
+
+ @Override
+ public synchronized void start() {
+ if (isCanceled) {
+ LOGGER.warn("Cannot start hashing if it has been cancelled before");
+ } else {
+ super.start();
+ }
+ }
public void setUploadToken(String token) {
if (!isCanceled && this.uploadToken == null) {
@@ -90,14 +105,15 @@ public class AsyncHashGenerator extends Thread {
}
Block block;
try {
- byte[] buffer;
- try {
- buffer = new byte[chunk.range.getLength()];
- } catch (OutOfMemoryError e) {
- LOGGER.info("Low memory - slowing down hashing");
- Util.sleep(5000);
- continue;
- }
+ byte[] buffer = null;
+ do {
+ try {
+ buffer = new byte[chunk.range.getLength()];
+ } catch (OutOfMemoryError e) {
+ LOGGER.info("Low memory - slowing down hashing");
+ Util.sleep(5000);
+ }
+ } while (buffer == null);
file.seek(chunk.range.startOffset);
file.readFully(buffer);
block = new Block(chunk, buffer);
@@ -111,23 +127,26 @@ public class AsyncHashGenerator extends Thread {
}
//
if (System.currentTimeMillis() > nextReadMsg) {
- nextReadMsg = System.currentTimeMillis() + 30000;
+ nextReadMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Read chunk " + chunk.getChunkIndex());
}
//
for (;;) {
if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) {
LOGGER.warn("Aborting current hash job - pool has shut down");
+ isCanceled = true;
Thread.currentThread().interrupt();
return;
}
try {
+ pendingHashes.incrementAndGet();
HASH_WORK_POOL.execute(block);
// Don't hash too furiously in the background if the upload didn't start yet
if (uploadToken == null && chunk.getChunkIndex() > 4) {
Util.sleep(200);
}
} catch (RejectedExecutionException e) {
+ pendingHashes.decrementAndGet();
LOGGER.warn("Hash pool worker rejected a hash job!? Retrying...");
Util.sleep(1000);
continue;
@@ -142,7 +161,7 @@ public class AsyncHashGenerator extends Thread {
}
}
- public void cancel() {
+ public synchronized void cancel() {
LOGGER.debug("Cancelled externally");
isCanceled = true;
}
@@ -152,7 +171,7 @@ public class AsyncHashGenerator extends Thread {
*/
private class Block implements Runnable {
public final FileChunk chunk;
- public final byte[] buffer;
+ public byte[] buffer;
public Block(FileChunk chunk, byte[] buffer) {
this.chunk = chunk;
@@ -163,7 +182,14 @@ public class AsyncHashGenerator extends Thread {
public void run() {
MessageDigest digester = SHA1_DIGESTER.get();
digester.update(buffer, 0, chunk.range.getLength());
+ this.buffer = null; // Clear reference before calling function below
byte[] hash = digester.digest();
+ synchronized (this) {
+ if (isCanceled) {
+ pendingHashes.decrementAndGet();
+ return;
+ }
+ }
hashDone(chunk, hash);
}
}
@@ -180,7 +206,7 @@ public class AsyncHashGenerator extends Thread {
int chunkIndex = chunk.getChunkIndex();
boolean wasLastChunk = false;
if (System.currentTimeMillis() > nextDoneMsg) {
- nextDoneMsg = System.currentTimeMillis() + 30000;
+ nextDoneMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Done hashing chunk " + chunkIndex);
}
synchronized (chunkHashes) {
@@ -224,8 +250,7 @@ public class AsyncHashGenerator extends Thread {
isCanceled = true;
}
}
- if (wasLastChunk) {
- hashingDone = true;
+ if (pendingHashes.decrementAndGet() == 0) {
cleanupIfDone();
}
}
@@ -235,15 +260,25 @@ public class AsyncHashGenerator extends Thread {
* a reference to this class, at least we will not prevent this stuff from being
* garbage collected.
*/
- private void cleanupIfDone() {
- if (uploadToken == null && !isCanceled)
+ private synchronized void cleanupIfDone() {
+ if (!readingDone && isAlive())
return;
- if (!readingDone)
+ if (uploadToken == null && !isCanceled)
return;
- if (!hashingDone && !isCanceled)
+ if (pendingHashes.get() != 0)
return;
+ isCanceled = true;
chunkHashes.clear();
chunkList.clear();
+ LOGGER.debug("Hasher cleaned up");
+ }
+
+ /**
+ * @return true if this instance is not dong anything meaningful anymore
+ * and no reference to it needs to be kept around.
+ */
+ public boolean canBeDiscarded() {
+ return isCanceled || (!isAlive() && pendingHashes.get() == 0);
}
/**
@@ -252,6 +287,8 @@ public class AsyncHashGenerator extends Thread {
* @return false if the token is not known to the server
*/
private boolean submitHashes(boolean mustSucceed) {
+ if (isCanceled)
+ return true;
List<ByteBuffer> subList;
boolean d;
synchronized (chunkHashes) {
@@ -262,7 +299,7 @@ public class AsyncHashGenerator extends Thread {
d = System.currentTimeMillis() > nextSendingMsg;
}
if (d) {
- nextSendingMsg = System.currentTimeMillis() + 30000;
+ nextSendingMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")");
}
if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list