diff options
Diffstat (limited to 'dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer')
4 files changed, 84 insertions, 28 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java index cf593200..8ef12e12 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java @@ -13,6 +13,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,8 +30,9 @@ public class AsyncHashGenerator extends Thread { private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class); private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1, - Math.max(1, Runtime.getRuntime().availableProcessors() - 1), - 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2), + Math.max(1, (int)Math.min(Runtime.getRuntime().availableProcessors() - 1, + Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3))), + 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy()); private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() { @@ -52,9 +54,13 @@ public class AsyncHashGenerator extends Thread { private final List<FileChunk> chunkList; private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-( private boolean readingDone = false; - private boolean hashingDone = false; + private AtomicInteger pendingHashes = new AtomicInteger(); private volatile boolean isCanceled = false; + + static { + LOGGER.info("Using " + HASH_WORK_POOL.getMaximumPoolSize() + " hash workers."); + } public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException { try { @@ -70,6 +76,15 @@ public class AsyncHashGenerator extends Thread { setDaemon(true); setName("HashGenerator"); } + + @Override + public synchronized void start() { + if (isCanceled) { + LOGGER.warn("Cannot start hashing if it has been cancelled before"); + } else { + super.start(); + } + } public void setUploadToken(String token) { if (!isCanceled && this.uploadToken == null) { @@ -90,14 +105,15 @@ public class AsyncHashGenerator extends Thread { } Block block; try { - byte[] buffer; - try { - buffer = new byte[chunk.range.getLength()]; - } catch (OutOfMemoryError e) { - LOGGER.info("Low memory - slowing down hashing"); - Util.sleep(5000); - continue; - } + byte[] buffer = null; + do { + try { + buffer = new byte[chunk.range.getLength()]; + } catch (OutOfMemoryError e) { + LOGGER.info("Low memory - slowing down hashing"); + Util.sleep(5000); + } + } while (buffer == null); file.seek(chunk.range.startOffset); file.readFully(buffer); block = new Block(chunk, buffer); @@ -111,23 +127,26 @@ public class AsyncHashGenerator extends Thread { } // if (System.currentTimeMillis() > nextReadMsg) { - nextReadMsg = System.currentTimeMillis() + 30000; + nextReadMsg = System.currentTimeMillis() + 60000; LOGGER.debug("Read chunk " + chunk.getChunkIndex()); } // for (;;) { if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) { LOGGER.warn("Aborting current hash job - pool has shut down"); + isCanceled = true; Thread.currentThread().interrupt(); return; } try { + pendingHashes.incrementAndGet(); HASH_WORK_POOL.execute(block); // Don't hash too furiously in the background if the upload didn't start yet if (uploadToken == null && chunk.getChunkIndex() > 4) { Util.sleep(200); } } catch (RejectedExecutionException e) { + pendingHashes.decrementAndGet(); LOGGER.warn("Hash pool worker rejected a hash job!? Retrying..."); Util.sleep(1000); continue; @@ -142,7 +161,7 @@ public class AsyncHashGenerator extends Thread { } } - public void cancel() { + public synchronized void cancel() { LOGGER.debug("Cancelled externally"); isCanceled = true; } @@ -152,7 +171,7 @@ public class AsyncHashGenerator extends Thread { */ private class Block implements Runnable { public final FileChunk chunk; - public final byte[] buffer; + public byte[] buffer; public Block(FileChunk chunk, byte[] buffer) { this.chunk = chunk; @@ -163,7 +182,14 @@ public class AsyncHashGenerator extends Thread { public void run() { MessageDigest digester = SHA1_DIGESTER.get(); digester.update(buffer, 0, chunk.range.getLength()); + this.buffer = null; // Clear reference before calling function below byte[] hash = digester.digest(); + synchronized (this) { + if (isCanceled) { + pendingHashes.decrementAndGet(); + return; + } + } hashDone(chunk, hash); } } @@ -180,7 +206,7 @@ public class AsyncHashGenerator extends Thread { int chunkIndex = chunk.getChunkIndex(); boolean wasLastChunk = false; if (System.currentTimeMillis() > nextDoneMsg) { - nextDoneMsg = System.currentTimeMillis() + 30000; + nextDoneMsg = System.currentTimeMillis() + 60000; LOGGER.debug("Done hashing chunk " + chunkIndex); } synchronized (chunkHashes) { @@ -224,8 +250,7 @@ public class AsyncHashGenerator extends Thread { isCanceled = true; } } - if (wasLastChunk) { - hashingDone = true; + if (pendingHashes.decrementAndGet() == 0) { cleanupIfDone(); } } @@ -235,15 +260,25 @@ public class AsyncHashGenerator extends Thread { * a reference to this class, at least we will not prevent this stuff from being * garbage collected. */ - private void cleanupIfDone() { - if (uploadToken == null && !isCanceled) + private synchronized void cleanupIfDone() { + if (!readingDone && isAlive()) return; - if (!readingDone) + if (uploadToken == null && !isCanceled) return; - if (!hashingDone && !isCanceled) + if (pendingHashes.get() != 0) return; + isCanceled = true; chunkHashes.clear(); chunkList.clear(); + LOGGER.debug("Hasher cleaned up"); + } + + /** + * @return true if this instance is not dong anything meaningful anymore + * and no reference to it needs to be kept around. + */ + public boolean canBeDiscarded() { + return isCanceled || (!isAlive() && pendingHashes.get() == 0); } /** @@ -252,6 +287,8 @@ public class AsyncHashGenerator extends Thread { * @return false if the token is not known to the server */ private boolean submitHashes(boolean mustSucceed) { + if (isCanceled) + return true; List<ByteBuffer> subList; boolean d; synchronized (chunkHashes) { @@ -262,7 +299,7 @@ public class AsyncHashGenerator extends Thread { d = System.currentTimeMillis() > nextSendingMsg; } if (d) { - nextSendingMsg = System.currentTimeMillis() + 30000; + nextSendingMsg = System.currentTimeMillis() + 60000; LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")"); } if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java index 0692f8a2..3b57222f 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java @@ -4,6 +4,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.RandomAccessFile; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,7 +29,9 @@ public class DownloadTask extends TransferTask { * Logger instance for this class. */ private final static Logger LOGGER = LogManager.getLogger(DownloadTask.class); - + + private static final AtomicInteger THREAD_ID = new AtomicInteger(); + private final String host; private final int port; private final String downloadToken; @@ -122,6 +125,10 @@ public class DownloadTask extends TransferTask { private class DownloadThread extends TransferThread { private Downloader downloader = null; private DownloadHandler cb = new DownloadHandler(); + + public DownloadThread() { + super("UpConn#" + THREAD_ID.incrementAndGet()); + } @Override public void run() { diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java index cca42929..4f7ed6fe 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java @@ -69,7 +69,7 @@ public abstract class TransferTask implements Runnable, TransferEventEmitter { ensureActivity(); Util.sleep(UPDATE_INTERVAL_MS); } - LOGGER.info("Transfer worker mainloop finished"); + LOGGER.debug("Transfer worker mainloop finished"); List<TransferThread> joinList = new ArrayList<>(); synchronized (transfers) { isCancelled = true; @@ -85,7 +85,7 @@ public abstract class TransferTask implements Runnable, TransferEventEmitter { Util.joinThread(t); } cleanup(); - LOGGER.info("Trasfer worker exiting"); + LOGGER.info("Transfer worker exiting"); } protected void cleanup() { @@ -197,7 +197,7 @@ public abstract class TransferTask implements Runnable, TransferEventEmitter { protected final void connectFailed(TransferThread thread) { synchronized (transfers) { connectingTransfers.remove(thread); - LOGGER.info("Establishing new transfer connection failed, [a:" + transfers.size() + "/c:" + LOGGER.debug("Establishing new transfer connection failed, [a:" + transfers.size() + "/c:" + connectingTransfers.size() + "]"); } } @@ -207,7 +207,7 @@ public abstract class TransferTask implements Runnable, TransferEventEmitter { connectingTransfers.remove(thread); if (!isCancelled) { transfers.add(thread); - LOGGER.info("Establishing new transfer connection succeeded, [a:" + transfers.size() + "/c:" + LOGGER.debug("Establishing new transfer connection succeeded, [a:" + transfers.size() + "/c:" + connectingTransfers.size() + "]"); return; } @@ -219,7 +219,7 @@ public abstract class TransferTask implements Runnable, TransferEventEmitter { protected final void transferEnded(TransferThread thread, boolean success) { synchronized (transfers) { transfers.remove(thread); - LOGGER.info("A transfer connection has finished (success=" + success + "), [a:" + transfers.size() + "/c:" + LOGGER.debug("A transfer connection has finished (success=" + success + "), [a:" + transfers.size() + "/c:" + connectingTransfers.size() + "]"); if (endgame && !success && transfers.isEmpty()) { // We had a transfer that reported success before, so we assume there are no more pending blocks @@ -262,6 +262,10 @@ public abstract class TransferTask implements Runnable, TransferEventEmitter { } protected abstract static class TransferThread extends Thread { + + public TransferThread(String name) { + super(name); + } @Override public abstract void run(); diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java index d9d188a2..84b9b47a 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java @@ -25,6 +25,9 @@ public class UploadTask extends TransferTask { * Logger instance for this class. */ private final static Logger LOGGER = LogManager.getLogger(UploadTask.class); + + private static final AtomicInteger THREAD_ID = new AtomicInteger(); + /** * Update interval of the block progress (needs thrift call to sat) */ @@ -64,6 +67,11 @@ public class UploadTask extends TransferTask { } private class UploadThread extends TransferThread { + + public UploadThread() { + super("UpConn#" + THREAD_ID.incrementAndGet()); + } + // private long totalBytesRead = 0; private long currentSpeed = 0; private Uploader uploader = null; |