package org.openslx.dozmod.filetransfer; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.dozmod.thrift.Session; import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.ThriftManager; import org.openslx.util.GrowingThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; import org.openslx.util.Util; public class AsyncHashGenerator extends Thread { private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class); private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1, Math.max(1, Runtime.getRuntime().availableProcessors() - 1), 10, TimeUnit.SECONDS, new LinkedBlockingQueue(2), new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy()); private static final ThreadLocal SHA1_DIGESTER = new ThreadLocal() { @Override protected MessageDigest initialValue() { try { return MessageDigest.getInstance("SHA-1"); } catch (NoSuchAlgorithmException e) { LOGGER.warn("No SHA-1 MD available. Cannot hash file", e); return null; } } }; private String uploadToken = null; private int finishedChunks = 0; private final RandomAccessFile file; private final List chunkHashes; private final List chunkList; private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-( private boolean readingDone = false; private boolean hashingDone = false; private volatile boolean isCanceled = false; public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException { try { file = new RandomAccessFile(uploadFile, "r"); } catch (FileNotFoundException e) { LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e); throw e; } LOGGER.debug("Opened file for hashing"); chunkList = new ArrayList<>(); FileChunk.createChunkList(chunkList, uploadFile.length(), null); chunkHashes = new ArrayList<>(chunkList.size()); setDaemon(true); setName("HashGenerator"); } public void setUploadToken(String token) { if (!isCanceled && this.uploadToken == null) { this.uploadToken = token; submitHashes(false); } cleanupIfDone(); } @Override public void run() { LOGGER.debug("Started hash reader worker"); try { for (FileChunk chunk : chunkList) { if (isCanceled) { LOGGER.debug("Cancelled chunk reader (1)"); break; } Block block; try { byte[] buffer; try { buffer = new byte[chunk.range.getLength()]; } catch (OutOfMemoryError e) { LOGGER.info("Low memory - slowing down hashing"); Util.sleep(5000); continue; } file.seek(chunk.range.startOffset); file.readFully(buffer); block = new Block(chunk, buffer); } catch (IOException e) { LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e); block = new Block(chunk, new byte[0]); } if (isCanceled) { LOGGER.debug("Cancelled chunk reader (2)"); break; } // if (System.currentTimeMillis() > nextReadMsg) { nextReadMsg = System.currentTimeMillis() + 30000; LOGGER.debug("Read chunk " + chunk.getChunkIndex()); } // for (;;) { if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) { LOGGER.warn("Aborting current hash job - pool has shut down"); Thread.currentThread().interrupt(); return; } try { HASH_WORK_POOL.execute(block); // Don't hash too furiously in the background if the upload didn't start yet if (uploadToken == null && chunk.getChunkIndex() > 4) { Util.sleep(200); } } catch (RejectedExecutionException e) { LOGGER.warn("Hash pool worker rejected a hash job!? Retrying..."); Util.sleep(1000); continue; } break; } } } finally { Util.safeClose(file); readingDone = true; cleanupIfDone(); } } public void cancel() { LOGGER.debug("Cancelled externally"); isCanceled = true; } /** * Worker for hashing chunk. Processed via thread pool. */ private class Block implements Runnable { public final FileChunk chunk; public final byte[] buffer; public Block(FileChunk chunk, byte[] buffer) { this.chunk = chunk; this.buffer = buffer; } @Override public void run() { MessageDigest digester = SHA1_DIGESTER.get(); digester.update(buffer, 0, chunk.range.getLength()); byte[] hash = digester.digest(); hashDone(chunk, hash); } } /** * Called by worker thread when a chunk has been hashed. * This means this method is not running in the currentAsyncHashGenerator * thread but one of the workers. * * @param chunk * @param hash */ private void hashDone(FileChunk chunk, byte[] hash) { int chunkIndex = chunk.getChunkIndex(); boolean wasLastChunk = false; if (System.currentTimeMillis() > nextDoneMsg) { nextDoneMsg = System.currentTimeMillis() + 30000; LOGGER.debug("Done hashing chunk " + chunkIndex); } synchronized (chunkHashes) { while (chunkHashes.size() < chunkIndex) { chunkHashes.add(null); } if (chunkHashes.size() == chunkIndex) { chunkHashes.add(ByteBuffer.wrap(hash)); } else { chunkHashes.set(chunkIndex, ByteBuffer.wrap(hash)); } if (chunkIndex == finishedChunks) { while (finishedChunks < chunkHashes.size() && chunkHashes.get(finishedChunks) != null) { finishedChunks++; if (finishedChunks == chunkList.size()) { wasLastChunk = true; } } } if (chunkIndex + 1 == chunkList.size()) { LOGGER.debug("Hashed last chunk #" + chunkIndex + ", total=" + chunkList.size() + ", finished=" + finishedChunks); } } if (wasLastChunk) { // Last chunk was hashed - make sure list gets to the server // Try up to 10 times LOGGER.debug("Hashing done"); for (int i = 0; i < 10; ++i) { if (submitHashes(true)) { LOGGER.debug("Hashes sent to server"); break; } LOGGER.debug("Sending hashes failed..."); if (!Util.sleep(2000)) break; // Interrupted } } else if (chunkIndex % 20 == 0) { // Mid-hashing - update server side if (!submitHashes(false)) { LOGGER.warn("Server rejected partial block hash list"); isCanceled = true; } } if (wasLastChunk) { hashingDone = true; cleanupIfDone(); } } /** * Drop references to all the chunk metadata - in case someone is still holding * a reference to this class, at least we will not prevent this stuff from being * garbage collected. */ private void cleanupIfDone() { if (uploadToken == null && !isCanceled) return; if (!readingDone) return; if (!hashingDone && !isCanceled) return; chunkHashes.clear(); chunkList.clear(); } /** * Submit current list of hashes. * * @return false if the token is not known to the server */ private boolean submitHashes(boolean mustSucceed) { List subList; boolean d; synchronized (chunkHashes) { subList = new ArrayList<>( chunkHashes.subList(0, finishedChunks) ); d = (finishedChunks == chunkList.size()); } if (!d) { d = System.currentTimeMillis() > nextSendingMsg; } if (d) { nextSendingMsg = System.currentTimeMillis() + 30000; LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")"); } if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list return true; try { if (d) LOGGER.debug("Making updateBlockHashes call"); ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken()); if (d) LOGGER.debug("updateBlockHashes call succeeded"); } catch (TInvalidTokenException e) { LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!"); isCanceled = true; return false; } catch (Exception e) { LOGGER.warn("Unknown exception when submitting hashList to sat", e); if (mustSucceed) return false; } return true; } }