summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2016-01-28 17:00:33 +0100
committerSimon Rettberg2016-01-28 17:00:33 +0100
commit5c457d05d090fbb28f0923f2978e49356b4790ff (patch)
treec76cf9f4015eaeb626363df05567fcbae8259187
parent[client] UploadInitiator: Fix error message on low disk space (diff)
downloadtutor-module-5c457d05d090fbb28f0923f2978e49356b4790ff.tar.gz
tutor-module-5c457d05d090fbb28f0923f2978e49356b4790ff.tar.xz
tutor-module-5c457d05d090fbb28f0923f2978e49356b4790ff.zip
[client] Make sure hashes get submitted after last block was hashed
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java51
1 files changed, 42 insertions, 9 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
index b5e9cb60..028f8788 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
@@ -17,6 +17,8 @@ import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
+import org.openslx.util.QuickTimer;
+import org.openslx.util.QuickTimer.Task;
import org.openslx.util.Util;
public class AsyncHashGenerator extends Thread {
@@ -55,7 +57,7 @@ public class AsyncHashGenerator extends Thread {
public void setUploadToken(String token) {
if (!isCanceled && this.uploadToken == null) {
this.uploadToken = token;
- submitHashes();
+ submitHashes(false);
}
}
@@ -65,8 +67,10 @@ public class AsyncHashGenerator extends Thread {
checker.start();
try {
for (FileChunk chunk : list) {
- if (isCanceled)
+ if (isCanceled) {
+ LOGGER.debug("Cancelled chunk reader (1)");
break;
+ }
Block block;
try {
byte[] buffer = new byte[chunk.range.getLength()];
@@ -77,8 +81,10 @@ public class AsyncHashGenerator extends Thread {
LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e);
block = new Block(chunk, null);
}
- if (isCanceled)
+ if (isCanceled) {
+ LOGGER.debug("Cancelled chunk reader (2)");
break;
+ }
try {
checker.put(block);
// Don't hash too furiously in the background if the upload didn't start yet
@@ -93,6 +99,7 @@ public class AsyncHashGenerator extends Thread {
}
try {
if (interrupted() || isCanceled) {
+ LOGGER.debug("Reader: Interrupting hasher");
checker.interrupt();
}
checker.join();
@@ -105,6 +112,7 @@ public class AsyncHashGenerator extends Thread {
}
public void cancel() {
+ LOGGER.debug("Cancelled externally");
isCanceled = true;
}
@@ -120,7 +128,7 @@ public class AsyncHashGenerator extends Thread {
private class Checker extends Thread {
- private final BlockingQueue<Block> queue = new LinkedBlockingQueue<>(5);;
+ private final BlockingQueue<Block> queue = new LinkedBlockingQueue<>(5);
public Checker() {
setName("Hasher");
@@ -132,7 +140,11 @@ public class AsyncHashGenerator extends Thread {
Thread.currentThread().interrupt();
return;
}
- queue.put(block);
+ if (!queue.offer(block)) {
+ LOGGER.debug("Putting read block into hasher queue is blocking!");
+ queue.put(block);
+ LOGGER.debug("Put succeeded");
+ }
}
@Override
@@ -140,8 +152,10 @@ public class AsyncHashGenerator extends Thread {
try {
while (!interrupted() && !isCanceled) {
Block block = queue.take();
- if (isCanceled)
+ if (isCanceled) {
+ LOGGER.debug("Hashing thread was cancelled");
break;
+ }
final byte[] hash;
if (block.buffer == null) {
hash = new byte[0];
@@ -159,12 +173,29 @@ public class AsyncHashGenerator extends Thread {
blockHashes.add(ByteBuffer.wrap(hash));
}
if (blockIndex % 20 == 0 || blockIndex + 1 == list.size()) {
- if (!submitHashes())
- break;
if (blockIndex + 1 == list.size()) {
+ // Last block was hashed - make sure list gets to the server
LOGGER.debug("Hashing done");
+ for (int i = 0; i < 10; ++i) {
+ if (submitHashes(true)) {
+ LOGGER.debug("Hashes sent to server");
+ break;
+ }
+ LOGGER.debug("Sending hashes failed...");
+ Thread.sleep(2000);
+ }
break;
}
+ // Mid-hashing - update server side
+ QuickTimer.scheduleOnce(new Task() {
+ @Override
+ public void fire() {
+ if (!submitHashes(false)) {
+ LOGGER.warn("Server rejected block hash list");
+ isCanceled = true;
+ }
+ }
+ });
}
}
} catch (InterruptedException e) {
@@ -181,7 +212,7 @@ public class AsyncHashGenerator extends Thread {
*
* @return false if the token is not known to the server
*/
- private boolean submitHashes() {
+ private boolean submitHashes(boolean mustSucceed) {
synchronized (blockHashes) {
if (uploadToken == null || blockHashes.isEmpty()) // No token yet, cannot submit, or empty list
return true;
@@ -193,6 +224,8 @@ public class AsyncHashGenerator extends Thread {
return false;
} catch (TException e) {
LOGGER.warn("Unknown exception when submitting hashList to sat", e);
+ if (mustSucceed)
+ return false;
}
return true;
}