diff options
| author | Simon Rettberg | 2015-08-20 17:15:01 +0200 |
|---|---|---|
| committer | Simon Rettberg | 2015-08-20 17:15:01 +0200 |
| commit | 9000efe64346cc08463114ab73d3d79188be884d (patch) | |
| tree | e70238f1d6a6f0e6c79f8f03b316132cf288e9ce /dozentenmodul/src/main/java/org | |
| parent | [client] Add ImageCustomPermissionManager and adjusted ImageCustomPermissionW... (diff) | |
| download | tutor-module-9000efe64346cc08463114ab73d3d79188be884d.tar.gz tutor-module-9000efe64346cc08463114ab73d3d79188be884d.tar.xz tutor-module-9000efe64346cc08463114ab73d3d79188be884d.zip | |
[client] Make AsyncHashGenerator faster by using two threads (I/O, Hashing)
Diffstat (limited to 'dozentenmodul/src/main/java/org')
| -rw-r--r-- | dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java | 152 | ||||
| -rw-r--r-- | dozentenmodul/src/main/java/org/openslx/dozmod/thrift/ThriftActions.java | 13 |
2 files changed, 126 insertions, 39 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java index 0361631c..f53d5d0b 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java @@ -9,6 +9,8 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -17,73 +19,80 @@ import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.ThriftManager; import org.openslx.util.Util; -public class AsyncHashGenerator implements Runnable { +public class AsyncHashGenerator extends Thread { private static final Logger LOGGER = Logger.getLogger(AsyncHashGenerator.class); - private final File uploadFile; - private final String uploadToken; + private final RandomAccessFile file; + private final List<ByteBuffer> blockHashes; + private final MessageDigest digester; + private final List<FileChunk> list; + private final AsyncHashGenerator thisGenerator = this; private volatile boolean isCanceled = false; - public AsyncHashGenerator(String uploadToken, File uploadFile) { + public AsyncHashGenerator(String uploadToken, File uploadFile) throws FileNotFoundException, + NoSuchAlgorithmException { this.uploadToken = uploadToken; - this.uploadFile = uploadFile; - } - - @Override - public void run() { - MessageDigest digester; try { digester = MessageDigest.getInstance("SHA-1"); } catch (NoSuchAlgorithmException e1) { LOGGER.warn("No SHA-1 MD available. Cannot hash file", e1); - return; + throw e1; } - final RandomAccessFile file; try { file = new RandomAccessFile(uploadFile, "r"); } catch (FileNotFoundException e) { LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e); - return; + throw e; } + list = new ArrayList<>(); + FileChunk.createChunkList(list, uploadFile.length(), null); + blockHashes = new ArrayList<>(list.size()); + setDaemon(true); + setName("HashGenerator"); + } + + @Override + public void run() { + Checker checker = new Checker(); + checker.start(); try { - List<FileChunk> list = new ArrayList<>(); - FileChunk.createChunkList(list, uploadFile.length(), null); - byte[] buffer = new byte[(int) TransferTask.CHUNK_SIZE]; - List<ByteBuffer> blockHashes = new ArrayList<>(list.size()); - for (int i = 0; i < list.size(); i++) { + for (FileChunk chunk : list) { if (isCanceled) break; - FileChunk chunk = list.get(i); + Block block; try { + byte[] buffer = new byte[chunk.range.getLength()]; file.seek(chunk.range.startOffset); - file.readFully(buffer, 0, chunk.range.getLength()); + file.readFully(buffer); + block = new Block(chunk, buffer); } catch (IOException e) { LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e); - continue; + block = new Block(chunk, null); } if (isCanceled) break; - digester.update(buffer, 0, chunk.range.getLength()); - byte[] hash = digester.digest(); - blockHashes.add(ByteBuffer.wrap(hash)); - if (i <= 5 || i % 20 == 0 || i + 1 == list.size()) { - if (isCanceled) - break; - try { - ThriftManager.getSatClient().updateBlockHashes(uploadToken, blockHashes); - } catch (TInvalidTokenException e) { - LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!"); - break; - } catch (TException e) { - LOGGER.warn("Unknown exception when submitting hashList to sat", e); - } + try { + checker.put(block); + } catch (InterruptedException e) { + LOGGER.info("Reader thread for hash checking got interrupted", e); + interrupt(); + break; } } + try { + if (interrupted() || isCanceled) { + checker.interrupt(); + } + checker.join(); + } catch (InterruptedException e) { + // + } } finally { Util.safeClose(file); + LOGGER.debug("AsyncHashGenerator done"); } } @@ -91,4 +100,77 @@ public class AsyncHashGenerator implements Runnable { isCanceled = true; } + private static class Block { + public final FileChunk chunk; + public final byte[] buffer; + + public Block(FileChunk chunk, byte[] buffer) { + this.chunk = chunk; + this.buffer = buffer; + } + } + + private class Checker extends Thread { + + private final BlockingQueue<Block> queue = new LinkedBlockingQueue<>(5);; + + public Checker() { + setName("Hasher"); + setDaemon(true); + } + + public void put(Block block) throws InterruptedException { + if (!isAlive()) { + Thread.currentThread().interrupt(); + return; + } + queue.put(block); + } + + @Override + public void run() { + try { + while (!interrupted() && !isCanceled) { + Block block = queue.take(); + if (isCanceled) + break; + final byte[] hash; + if (block.buffer == null) { + hash = new byte[0]; + } else { + digester.update(block.buffer, 0, block.chunk.range.getLength()); + hash = digester.digest(); + } + int blockIndex = block.chunk.getChunkIndex(); + if (blockHashes.size() != blockIndex) { + LOGGER.warn("Inconsistent state: blockHashes.size() != currentBlockIndex"); + isCanceled = true; + break; + } + blockHashes.add(ByteBuffer.wrap(hash)); + LOGGER.debug("blockIndex=" + blockIndex + ", list.size()=" + list.size()); + if (blockIndex <= 5 || blockIndex % 20 == 0 || blockIndex + 1 == list.size()) { + try { + ThriftManager.getSatClient().updateBlockHashes(uploadToken, blockHashes); + } catch (TInvalidTokenException e) { + LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!"); + isCanceled = true; + break; + } catch (TException e) { + LOGGER.warn("Unknown exception when submitting hashList to sat", e); + } + if (blockIndex + 1 == list.size()) + break; + } + } + } catch (InterruptedException e) { + LOGGER.debug("Hash checker thread got interrupted."); + interrupt(); + } finally { + thisGenerator.interrupt(); + LOGGER.debug("Hasher done"); + } + } + } + } diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/thrift/ThriftActions.java b/dozentenmodul/src/main/java/org/openslx/dozmod/thrift/ThriftActions.java index 68eb935b..f1f1d8cd 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/thrift/ThriftActions.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/thrift/ThriftActions.java @@ -5,6 +5,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; +import java.security.NoSuchAlgorithmException; import java.util.List; import javax.swing.JFileChooser; @@ -254,10 +255,14 @@ public class ThriftActions { + diskFile.getAbsolutePath(), MessageType.ERROR, LOGGER, e); return null; } - hashGen = new AsyncHashGenerator(transferInformation.token, diskFile); - Thread hashThread = new Thread(hashGen); - hashThread.setDaemon(true); - hashThread.start(); + try { + hashGen = new AsyncHashGenerator(transferInformation.token, diskFile); + hashGen.start(); + Util.sleep(50); // A little ugly... Give the hash generator a head start + } catch (FileNotFoundException | NoSuchAlgorithmException e) { + Gui.showMessageBox(frame, "Kann keine Block-Hashes für den Upload berechnen, " + + "automatische Fehlerkorrektur deaktiviert.", MessageType.WARNING, LOGGER, e); + } Util.sleep(50); // A little ugly... Give the hash generator a head start Thread uploadThread = new Thread(uploadTask); uploadThread.setDaemon(true); |
