diff options
| author | Simon Rettberg | 2016-01-28 17:00:33 +0100 |
|---|---|---|
| committer | Simon Rettberg | 2016-01-28 17:00:33 +0100 |
| commit | 5c457d05d090fbb28f0923f2978e49356b4790ff (patch) | |
| tree | c76cf9f4015eaeb626363df05567fcbae8259187 /dozentenmodul/src/main/java | |
| parent | [client] UploadInitiator: Fix error message on low disk space (diff) | |
| download | tutor-module-5c457d05d090fbb28f0923f2978e49356b4790ff.tar.gz tutor-module-5c457d05d090fbb28f0923f2978e49356b4790ff.tar.xz tutor-module-5c457d05d090fbb28f0923f2978e49356b4790ff.zip | |
[client] Make sure hashes get submitted after last block was hashed
Diffstat (limited to 'dozentenmodul/src/main/java')
| -rw-r--r-- | dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java | 51 |
1 files changed, 42 insertions, 9 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java index b5e9cb60..028f8788 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java @@ -17,6 +17,8 @@ import org.apache.thrift.TException; import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; import org.openslx.util.Util; public class AsyncHashGenerator extends Thread { @@ -55,7 +57,7 @@ public class AsyncHashGenerator extends Thread { public void setUploadToken(String token) { if (!isCanceled && this.uploadToken == null) { this.uploadToken = token; - submitHashes(); + submitHashes(false); } } @@ -65,8 +67,10 @@ public class AsyncHashGenerator extends Thread { checker.start(); try { for (FileChunk chunk : list) { - if (isCanceled) + if (isCanceled) { + LOGGER.debug("Cancelled chunk reader (1)"); break; + } Block block; try { byte[] buffer = new byte[chunk.range.getLength()]; @@ -77,8 +81,10 @@ public class AsyncHashGenerator extends Thread { LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e); block = new Block(chunk, null); } - if (isCanceled) + if (isCanceled) { + LOGGER.debug("Cancelled chunk reader (2)"); break; + } try { checker.put(block); // Don't hash too furiously in the background if the upload didn't start yet @@ -93,6 +99,7 @@ public class AsyncHashGenerator extends Thread { } try { if (interrupted() || isCanceled) { + LOGGER.debug("Reader: Interrupting hasher"); checker.interrupt(); } checker.join(); @@ -105,6 +112,7 @@ public class AsyncHashGenerator extends Thread { } public void cancel() { + LOGGER.debug("Cancelled externally"); isCanceled = true; } @@ -120,7 +128,7 @@ public class AsyncHashGenerator extends Thread { private class Checker extends Thread { - private final BlockingQueue<Block> queue = new LinkedBlockingQueue<>(5);; + private final BlockingQueue<Block> queue = new LinkedBlockingQueue<>(5); public Checker() { setName("Hasher"); @@ -132,7 +140,11 @@ public class AsyncHashGenerator extends Thread { Thread.currentThread().interrupt(); return; } - queue.put(block); + if (!queue.offer(block)) { + LOGGER.debug("Putting read block into hasher queue is blocking!"); + queue.put(block); + LOGGER.debug("Put succeeded"); + } } @Override @@ -140,8 +152,10 @@ public class AsyncHashGenerator extends Thread { try { while (!interrupted() && !isCanceled) { Block block = queue.take(); - if (isCanceled) + if (isCanceled) { + LOGGER.debug("Hashing thread was cancelled"); break; + } final byte[] hash; if (block.buffer == null) { hash = new byte[0]; @@ -159,12 +173,29 @@ public class AsyncHashGenerator extends Thread { blockHashes.add(ByteBuffer.wrap(hash)); } if (blockIndex % 20 == 0 || blockIndex + 1 == list.size()) { - if (!submitHashes()) - break; if (blockIndex + 1 == list.size()) { + // Last block was hashed - make sure list gets to the server LOGGER.debug("Hashing done"); + for (int i = 0; i < 10; ++i) { + if (submitHashes(true)) { + LOGGER.debug("Hashes sent to server"); + break; + } + LOGGER.debug("Sending hashes failed..."); + Thread.sleep(2000); + } break; } + // Mid-hashing - update server side + QuickTimer.scheduleOnce(new Task() { + @Override + public void fire() { + if (!submitHashes(false)) { + LOGGER.warn("Server rejected block hash list"); + isCanceled = true; + } + } + }); } } } catch (InterruptedException e) { @@ -181,7 +212,7 @@ public class AsyncHashGenerator extends Thread { * * @return false if the token is not known to the server */ - private boolean submitHashes() { + private boolean submitHashes(boolean mustSucceed) { synchronized (blockHashes) { if (uploadToken == null || blockHashes.isEmpty()) // No token yet, cannot submit, or empty list return true; @@ -193,6 +224,8 @@ public class AsyncHashGenerator extends Thread { return false; } catch (TException e) { LOGGER.warn("Unknown exception when submitting hashList to sat", e); + if (mustSucceed) + return false; } return true; } |
