summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2022-03-18 15:23:36 +0100
committerSimon Rettberg2022-03-18 15:23:36 +0100
commit3fcafcc65b05df16c08d57128b899b9ad88f65d8 (patch)
treef39c24d80da00831677da5314cfa7009d15ff100
parent[client] Update deprecated call (diff)
downloadtutor-module-3fcafcc65b05df16c08d57128b899b9ad88f65d8.tar.gz
tutor-module-3fcafcc65b05df16c08d57128b899b9ad88f65d8.tar.xz
tutor-module-3fcafcc65b05df16c08d57128b899b9ad88f65d8.zip
[client] Add even more debug spam to Upload
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java154
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java22
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java1
3 files changed, 106 insertions, 71 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
index e427d064..83e89302 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
@@ -16,26 +16,24 @@ import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
-import org.openslx.util.QuickTimer;
-import org.openslx.util.QuickTimer.Task;
import org.openslx.util.Util;
public class AsyncHashGenerator extends Thread {
private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);
- private static final ThreadPoolExecutor pool = new GrowingThreadPoolExecutor(1, Runtime.getRuntime()
- .availableProcessors(), 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(4),
- new PrioThreadFactory("HashGen"));
+ private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1,
+ Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
+ 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2),
+ new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy());
- private static final ThreadLocal<MessageDigest> sha1 = new ThreadLocal<MessageDigest>() {
+ private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() {
@Override
protected MessageDigest initialValue() {
try {
@@ -48,10 +46,11 @@ public class AsyncHashGenerator extends Thread {
};
private String uploadToken = null;
- private int finishedBlocks = 0;
+ private int finishedChunks = 0;
private final RandomAccessFile file;
- private final List<ByteBuffer> blockHashes;
- private final List<FileChunk> list;
+ private final List<ByteBuffer> chunkHashes;
+ private final List<FileChunk> chunkList;
+ private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-(
private volatile boolean isCanceled = false;
@@ -62,9 +61,10 @@ public class AsyncHashGenerator extends Thread {
LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e);
throw e;
}
- list = new ArrayList<>();
- FileChunk.createChunkList(list, uploadFile.length(), null);
- blockHashes = new ArrayList<>(list.size());
+ LOGGER.debug("Opened file for hashing");
+ chunkList = new ArrayList<>();
+ FileChunk.createChunkList(chunkList, uploadFile.length(), null);
+ chunkHashes = new ArrayList<>(chunkList.size());
setDaemon(true);
setName("HashGenerator");
}
@@ -78,15 +78,23 @@ public class AsyncHashGenerator extends Thread {
@Override
public void run() {
+ LOGGER.debug("Started hash reader worker");
try {
- for (FileChunk chunk : list) {
+ for (FileChunk chunk : chunkList) {
if (isCanceled) {
LOGGER.debug("Cancelled chunk reader (1)");
break;
}
Block block;
try {
- byte[] buffer = new byte[chunk.range.getLength()];
+ byte[] buffer;
+ try {
+ buffer = new byte[chunk.range.getLength()];
+ } catch (OutOfMemoryError e) {
+ LOGGER.info("Low memory - slowing down hashing");
+ Util.sleep(5000);
+ continue;
+ }
file.seek(chunk.range.startOffset);
file.readFully(buffer);
block = new Block(chunk, buffer);
@@ -98,19 +106,27 @@ public class AsyncHashGenerator extends Thread {
LOGGER.debug("Cancelled chunk reader (2)");
break;
}
+ //
+ if (System.currentTimeMillis() > nextReadMsg) {
+ nextReadMsg = System.currentTimeMillis() + 30000;
+ LOGGER.debug("Read chunk " + chunk.getChunkIndex());
+ }
+ //
for (;;) {
- if (pool.isTerminating() || pool.isTerminated()) {
+ if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) {
+ LOGGER.warn("Aborting current hash job - pool has shut down");
Thread.currentThread().interrupt();
return;
}
try {
- pool.execute(block);
+ HASH_WORK_POOL.execute(block);
// Don't hash too furiously in the background if the upload didn't start yet
- if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) {
+ if (uploadToken == null && chunk.getChunkIndex() > 4) {
Util.sleep(200);
}
} catch (RejectedExecutionException e) {
- Util.sleep(100);
+ LOGGER.warn("Hash pool worker rejected a hash job!? Retrying...");
+ Util.sleep(1000);
continue;
}
break;
@@ -126,6 +142,9 @@ public class AsyncHashGenerator extends Thread {
isCanceled = true;
}
+ /**
+ * Worker for hashing chunk. Processed via thread pool.
+ */
private class Block implements Runnable {
public final FileChunk chunk;
public final byte[] buffer;
@@ -137,7 +156,7 @@ public class AsyncHashGenerator extends Thread {
@Override
public void run() {
- MessageDigest digester = sha1.get();
+ MessageDigest digester = SHA1_DIGESTER.get();
digester.update(buffer, 0, chunk.range.getLength());
byte[] hash = digester.digest();
hashDone(chunk, hash);
@@ -145,7 +164,7 @@ public class AsyncHashGenerator extends Thread {
}
/**
- * Called by worker thread when a block has been hashed.
+ * Called by worker thread when a chunk has been hashed.
* This means this method is not running in the currentAsyncHashGenerator
* thread but one of the workers.
*
@@ -153,53 +172,53 @@ public class AsyncHashGenerator extends Thread {
* @param hash
*/
private void hashDone(FileChunk chunk, byte[] hash) {
- int blockIndex = chunk.getChunkIndex();
- synchronized (blockHashes) {
- while (blockHashes.size() < blockIndex) {
- blockHashes.add(null);
+ int chunkIndex = chunk.getChunkIndex();
+ boolean wasLastChunk = false;
+ if (System.currentTimeMillis() > nextDoneMsg) {
+ nextDoneMsg = System.currentTimeMillis() + 30000;
+ LOGGER.debug("Done hashing chunk " + chunkIndex);
+ }
+ synchronized (chunkHashes) {
+ while (chunkHashes.size() < chunkIndex) {
+ chunkHashes.add(null);
}
- if (blockHashes.size() == blockIndex) {
- blockHashes.add(ByteBuffer.wrap(hash));
+ if (chunkHashes.size() == chunkIndex) {
+ chunkHashes.add(ByteBuffer.wrap(hash));
} else {
- blockHashes.set(blockIndex, ByteBuffer.wrap(hash));
+ chunkHashes.set(chunkIndex, ByteBuffer.wrap(hash));
}
- if (blockIndex == finishedBlocks) {
- while (finishedBlocks < blockHashes.size() && blockHashes.get(finishedBlocks) != null) {
- finishedBlocks++;
+ if (chunkIndex == finishedChunks) {
+ while (finishedChunks < chunkHashes.size() && chunkHashes.get(finishedChunks) != null) {
+ finishedChunks++;
+ if (finishedChunks == chunkList.size()) {
+ wasLastChunk = true;
+ }
}
}
+ if (chunkIndex + 1 == chunkList.size()) {
+ LOGGER.debug("Hashed last chunk #" + chunkIndex + ", total=" + chunkList.size() + ", finished=" + finishedChunks);
+ }
}
- if (blockIndex % 20 == 0 || finishedBlocks == list.size()) {
- if (blockIndex + 1 == list.size()) {
- // Last block was hashed - make sure list gets to the server
- LOGGER.debug("Hashing done");
- for (int i = 0; i < 10; ++i) {
- if (submitHashes(true)) {
- LOGGER.debug("Hashes sent to server");
- break;
- }
- LOGGER.debug("Sending hashes failed...");
- try {
- Thread.sleep(2000);
- continue;
- } catch (InterruptedException e) {
- interrupt();
- return;
- }
+ if (wasLastChunk) {
+ // Last chunk was hashed - make sure list gets to the server
+ // Try up to 10 times
+ LOGGER.debug("Hashing done");
+ for (int i = 0; i < 10; ++i) {
+ if (submitHashes(true)) {
+ LOGGER.debug("Hashes sent to server");
+ break;
}
- return;
+ LOGGER.debug("Sending hashes failed...");
+ if (!Util.sleep(2000))
+ break; // Interrupted
}
+ } else if (chunkIndex % 20 == 0) {
// Mid-hashing - update server side
- QuickTimer.scheduleOnce(new Task() {
- @Override
- public void fire() {
- if (!submitHashes(false)) {
- LOGGER.warn("Server rejected block hash list");
- isCanceled = true;
- }
- }
- });
- }
+ if (!submitHashes(false)) {
+ LOGGER.warn("Server rejected partial block hash list");
+ isCanceled = true;
+ }
+ }
}
/**
@@ -209,18 +228,29 @@ public class AsyncHashGenerator extends Thread {
*/
private boolean submitHashes(boolean mustSucceed) {
List<ByteBuffer> subList;
- synchronized (blockHashes) {
- subList = new ArrayList<>( blockHashes.subList(0, finishedBlocks) );
+ boolean d;
+ synchronized (chunkHashes) {
+ subList = new ArrayList<>( chunkHashes.subList(0, finishedChunks) );
+ d = (finishedChunks == chunkList.size());
+ }
+ if (!d) {
+ d = System.currentTimeMillis() > nextSendingMsg;
+ }
+ if (d) {
+ nextSendingMsg = System.currentTimeMillis() + 30000;
+ LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")");
}
if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list
return true;
try {
+ if (d) LOGGER.debug("Making updateBlockHashes call");
ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken());
+ if (d) LOGGER.debug("updateBlockHashes call succeeded");
} catch (TInvalidTokenException e) {
LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!");
isCanceled = true;
return false;
- } catch (TException e) {
+ } catch (Exception e) {
LOGGER.warn("Unknown exception when submitting hashList to sat", e);
if (mustSucceed)
return false;
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
index 8471ca8f..d9d188a2 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
@@ -6,7 +6,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
@@ -36,7 +35,7 @@ public class UploadTask extends TransferTask {
private final int port;
private final String uploadToken;
private final long startTime;
- private String remoteError = null;
+ private String transferConnectionError = null;
/**
* Keep track of the number of active upload connections
@@ -120,13 +119,13 @@ public class UploadTask extends TransferTask {
}
});
if (ret) {
- remoteError = null;
+ transferConnectionError = null;
consecutiveInitFails.set(0);
} else {
String err = uploader.getRemoteError();
- if (err != null && !err.equals(remoteError)) {
+ if (err != null && !err.equals(transferConnectionError)) {
LOGGER.warn("Upload task remote error: " + err);
- remoteError = err;
+ transferConnectionError = err;
}
consecutiveInitFails.incrementAndGet();
}
@@ -148,6 +147,7 @@ public class UploadTask extends TransferTask {
private long lastThriftUpdate = 0;
private long virtualSpeed = 0;
+ private long nextQueryDebug;
@Override
protected TransferEvent getTransferEvent() {
@@ -158,14 +158,20 @@ public class UploadTask extends TransferTask {
if (lastThriftUpdate + THRIFT_INTERVAL_MS < now) {
lastThriftUpdate = now;
try {
+ if (System.currentTimeMillis() > nextQueryDebug) {
+ nextQueryDebug = System.currentTimeMillis() + 30000;
+ LOGGER.debug("Querying upload status...");
+ }
TransferStatus uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken);
state = uploadStatus.getState();
blocks = uploadStatus.getBlockStatus();
} catch (TInvalidTokenException e) {
error = "Upload token unknown!?";
state = TransferState.ERROR;
- } catch (TException e) {
+ LOGGER.warn("Cannot query upload status: Token not known by the server");
+ } catch (Exception e) {
error = "Exception quering upload status: " + e.toString();
+ LOGGER.warn("Cannot query upload status", e);
}
}
long speed = 0;
@@ -189,8 +195,8 @@ public class UploadTask extends TransferTask {
virtualSpeed = ((blocks.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1);
}
}
- if (remoteError != null && (error == null || remoteError.equals("Out of disk space"))) {
- error = remoteError;
+ if (transferConnectionError != null && (error == null || transferConnectionError.equals("Out of disk space"))) {
+ error = transferConnectionError;
}
TransferEvent event = new TransferEvent(state, blocks, speed, virtualSpeed, timeRemaining, error);
return event;
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java b/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java
index 7f310f3b..400ee6c6 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java
@@ -38,7 +38,6 @@ public class ImageUploadSummaryPage extends ImageUploadSummaryPageLayout {
private final TransferEventListener uploadListener = new TransferEventListener() {
@Override
public void update(TransferEvent event) {
- LOGGER.debug("update transfer event");
if (!pageIsVisible)
return;
if (event.progress != null) {