diff options
3 files changed, 106 insertions, 71 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java index e427d064..83e89302 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java @@ -16,26 +16,24 @@ import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.dozmod.thrift.Session; import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.ThriftManager; import org.openslx.util.GrowingThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; -import org.openslx.util.QuickTimer; -import org.openslx.util.QuickTimer.Task; import org.openslx.util.Util; public class AsyncHashGenerator extends Thread { private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class); - private static final ThreadPoolExecutor pool = new GrowingThreadPoolExecutor(1, Runtime.getRuntime() - .availableProcessors(), 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(4), - new PrioThreadFactory("HashGen")); + private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1, + Math.max(1, Runtime.getRuntime().availableProcessors() - 1), + 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2), + new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy()); - private static final ThreadLocal<MessageDigest> sha1 = new ThreadLocal<MessageDigest>() { + private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() { @Override protected MessageDigest initialValue() { try { @@ -48,10 +46,11 @@ public class AsyncHashGenerator extends Thread { }; private String uploadToken = null; - private int finishedBlocks = 0; + private int finishedChunks = 0; private final RandomAccessFile file; - private final List<ByteBuffer> blockHashes; - private final List<FileChunk> list; + private final List<ByteBuffer> chunkHashes; + private final List<FileChunk> chunkList; + private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-( private volatile boolean isCanceled = false; @@ -62,9 +61,10 @@ public class AsyncHashGenerator extends Thread { LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e); throw e; } - list = new ArrayList<>(); - FileChunk.createChunkList(list, uploadFile.length(), null); - blockHashes = new ArrayList<>(list.size()); + LOGGER.debug("Opened file for hashing"); + chunkList = new ArrayList<>(); + FileChunk.createChunkList(chunkList, uploadFile.length(), null); + chunkHashes = new ArrayList<>(chunkList.size()); setDaemon(true); setName("HashGenerator"); } @@ -78,15 +78,23 @@ public class AsyncHashGenerator extends Thread { @Override public void run() { + LOGGER.debug("Started hash reader worker"); try { - for (FileChunk chunk : list) { + for (FileChunk chunk : chunkList) { if (isCanceled) { LOGGER.debug("Cancelled chunk reader (1)"); break; } Block block; try { - byte[] buffer = new byte[chunk.range.getLength()]; + byte[] buffer; + try { + buffer = new byte[chunk.range.getLength()]; + } catch (OutOfMemoryError e) { + LOGGER.info("Low memory - slowing down hashing"); + Util.sleep(5000); + continue; + } file.seek(chunk.range.startOffset); file.readFully(buffer); block = new Block(chunk, buffer); @@ -98,19 +106,27 @@ public class AsyncHashGenerator extends Thread { LOGGER.debug("Cancelled chunk reader (2)"); break; } + // + if (System.currentTimeMillis() > nextReadMsg) { + nextReadMsg = System.currentTimeMillis() + 30000; + LOGGER.debug("Read chunk " + chunk.getChunkIndex()); + } + // for (;;) { - if (pool.isTerminating() || pool.isTerminated()) { + if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) { + LOGGER.warn("Aborting current hash job - pool has shut down"); Thread.currentThread().interrupt(); return; } try { - pool.execute(block); + HASH_WORK_POOL.execute(block); // Don't hash too furiously in the background if the upload didn't start yet - if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) { + if (uploadToken == null && chunk.getChunkIndex() > 4) { Util.sleep(200); } } catch (RejectedExecutionException e) { - Util.sleep(100); + LOGGER.warn("Hash pool worker rejected a hash job!? Retrying..."); + Util.sleep(1000); continue; } break; @@ -126,6 +142,9 @@ public class AsyncHashGenerator extends Thread { isCanceled = true; } + /** + * Worker for hashing chunk. Processed via thread pool. + */ private class Block implements Runnable { public final FileChunk chunk; public final byte[] buffer; @@ -137,7 +156,7 @@ public class AsyncHashGenerator extends Thread { @Override public void run() { - MessageDigest digester = sha1.get(); + MessageDigest digester = SHA1_DIGESTER.get(); digester.update(buffer, 0, chunk.range.getLength()); byte[] hash = digester.digest(); hashDone(chunk, hash); @@ -145,7 +164,7 @@ public class AsyncHashGenerator extends Thread { } /** - * Called by worker thread when a block has been hashed. + * Called by worker thread when a chunk has been hashed. * This means this method is not running in the currentAsyncHashGenerator * thread but one of the workers. * @@ -153,53 +172,53 @@ public class AsyncHashGenerator extends Thread { * @param hash */ private void hashDone(FileChunk chunk, byte[] hash) { - int blockIndex = chunk.getChunkIndex(); - synchronized (blockHashes) { - while (blockHashes.size() < blockIndex) { - blockHashes.add(null); + int chunkIndex = chunk.getChunkIndex(); + boolean wasLastChunk = false; + if (System.currentTimeMillis() > nextDoneMsg) { + nextDoneMsg = System.currentTimeMillis() + 30000; + LOGGER.debug("Done hashing chunk " + chunkIndex); + } + synchronized (chunkHashes) { + while (chunkHashes.size() < chunkIndex) { + chunkHashes.add(null); } - if (blockHashes.size() == blockIndex) { - blockHashes.add(ByteBuffer.wrap(hash)); + if (chunkHashes.size() == chunkIndex) { + chunkHashes.add(ByteBuffer.wrap(hash)); } else { - blockHashes.set(blockIndex, ByteBuffer.wrap(hash)); + chunkHashes.set(chunkIndex, ByteBuffer.wrap(hash)); } - if (blockIndex == finishedBlocks) { - while (finishedBlocks < blockHashes.size() && blockHashes.get(finishedBlocks) != null) { - finishedBlocks++; + if (chunkIndex == finishedChunks) { + while (finishedChunks < chunkHashes.size() && chunkHashes.get(finishedChunks) != null) { + finishedChunks++; + if (finishedChunks == chunkList.size()) { + wasLastChunk = true; + } } } + if (chunkIndex + 1 == chunkList.size()) { + LOGGER.debug("Hashed last chunk #" + chunkIndex + ", total=" + chunkList.size() + ", finished=" + finishedChunks); + } } - if (blockIndex % 20 == 0 || finishedBlocks == list.size()) { - if (blockIndex + 1 == list.size()) { - // Last block was hashed - make sure list gets to the server - LOGGER.debug("Hashing done"); - for (int i = 0; i < 10; ++i) { - if (submitHashes(true)) { - LOGGER.debug("Hashes sent to server"); - break; - } - LOGGER.debug("Sending hashes failed..."); - try { - Thread.sleep(2000); - continue; - } catch (InterruptedException e) { - interrupt(); - return; - } + if (wasLastChunk) { + // Last chunk was hashed - make sure list gets to the server + // Try up to 10 times + LOGGER.debug("Hashing done"); + for (int i = 0; i < 10; ++i) { + if (submitHashes(true)) { + LOGGER.debug("Hashes sent to server"); + break; } - return; + LOGGER.debug("Sending hashes failed..."); + if (!Util.sleep(2000)) + break; // Interrupted } + } else if (chunkIndex % 20 == 0) { // Mid-hashing - update server side - QuickTimer.scheduleOnce(new Task() { - @Override - public void fire() { - if (!submitHashes(false)) { - LOGGER.warn("Server rejected block hash list"); - isCanceled = true; - } - } - }); - } + if (!submitHashes(false)) { + LOGGER.warn("Server rejected partial block hash list"); + isCanceled = true; + } + } } /** @@ -209,18 +228,29 @@ public class AsyncHashGenerator extends Thread { */ private boolean submitHashes(boolean mustSucceed) { List<ByteBuffer> subList; - synchronized (blockHashes) { - subList = new ArrayList<>( blockHashes.subList(0, finishedBlocks) ); + boolean d; + synchronized (chunkHashes) { + subList = new ArrayList<>( chunkHashes.subList(0, finishedChunks) ); + d = (finishedChunks == chunkList.size()); + } + if (!d) { + d = System.currentTimeMillis() > nextSendingMsg; + } + if (d) { + nextSendingMsg = System.currentTimeMillis() + 30000; + LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")"); } if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list return true; try { + if (d) LOGGER.debug("Making updateBlockHashes call"); ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken()); + if (d) LOGGER.debug("updateBlockHashes call succeeded"); } catch (TInvalidTokenException e) { LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!"); isCanceled = true; return false; - } catch (TException e) { + } catch (Exception e) { LOGGER.warn("Unknown exception when submitting hashList to sat", e); if (mustSucceed) return false; diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java index 8471ca8f..d9d188a2 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java @@ -6,7 +6,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; @@ -36,7 +35,7 @@ public class UploadTask extends TransferTask { private final int port; private final String uploadToken; private final long startTime; - private String remoteError = null; + private String transferConnectionError = null; /** * Keep track of the number of active upload connections @@ -120,13 +119,13 @@ public class UploadTask extends TransferTask { } }); if (ret) { - remoteError = null; + transferConnectionError = null; consecutiveInitFails.set(0); } else { String err = uploader.getRemoteError(); - if (err != null && !err.equals(remoteError)) { + if (err != null && !err.equals(transferConnectionError)) { LOGGER.warn("Upload task remote error: " + err); - remoteError = err; + transferConnectionError = err; } consecutiveInitFails.incrementAndGet(); } @@ -148,6 +147,7 @@ public class UploadTask extends TransferTask { private long lastThriftUpdate = 0; private long virtualSpeed = 0; + private long nextQueryDebug; @Override protected TransferEvent getTransferEvent() { @@ -158,14 +158,20 @@ public class UploadTask extends TransferTask { if (lastThriftUpdate + THRIFT_INTERVAL_MS < now) { lastThriftUpdate = now; try { + if (System.currentTimeMillis() > nextQueryDebug) { + nextQueryDebug = System.currentTimeMillis() + 30000; + LOGGER.debug("Querying upload status..."); + } TransferStatus uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken); state = uploadStatus.getState(); blocks = uploadStatus.getBlockStatus(); } catch (TInvalidTokenException e) { error = "Upload token unknown!?"; state = TransferState.ERROR; - } catch (TException e) { + LOGGER.warn("Cannot query upload status: Token not known by the server"); + } catch (Exception e) { error = "Exception quering upload status: " + e.toString(); + LOGGER.warn("Cannot query upload status", e); } } long speed = 0; @@ -189,8 +195,8 @@ public class UploadTask extends TransferTask { virtualSpeed = ((blocks.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1); } } - if (remoteError != null && (error == null || remoteError.equals("Out of disk space"))) { - error = remoteError; + if (transferConnectionError != null && (error == null || transferConnectionError.equals("Out of disk space"))) { + error = transferConnectionError; } TransferEvent event = new TransferEvent(state, blocks, speed, virtualSpeed, timeRemaining, error); return event; diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java b/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java index 7f310f3b..400ee6c6 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/gui/wizard/page/ImageUploadSummaryPage.java @@ -38,7 +38,6 @@ public class ImageUploadSummaryPage extends ImageUploadSummaryPageLayout { private final TransferEventListener uploadListener = new TransferEventListener() { @Override public void update(TransferEvent event) { - LOGGER.debug("update transfer event"); if (!pageIsVisible) return; if (event.progress != null) { |