package org.openslx.dozmod.filetransfer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
import org.openslx.util.Util;
public class AsyncHashGenerator extends Thread {
private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);
private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1,
Math.max(1, (int)Math.min(Runtime.getRuntime().availableProcessors() - 1,
Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3))),
10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1),
new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy());
private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() {
@Override
protected MessageDigest initialValue() {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
LOGGER.warn("No SHA-1 MD available. Cannot hash file", e);
return null;
}
}
};
private String uploadToken = null;
private int finishedChunks = 0;
private final RandomAccessFile file;
private final List<ByteBuffer> chunkHashes;
private final List<FileChunk> chunkList;
private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-(
private boolean readingDone = false;
private AtomicInteger pendingHashes = new AtomicInteger();
private volatile boolean isCanceled = false;
private AtomicInteger completeCount = new AtomicInteger();
static {
LOGGER.info("Using " + HASH_WORK_POOL.getMaximumPoolSize() + " hash workers.");
}
public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException {
try {
file = new RandomAccessFile(uploadFile, "r");
} catch (FileNotFoundException e) {
LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e);
throw e;
}
LOGGER.debug("Opened file for hashing");
chunkList = new ArrayList<>();
FileChunk.createChunkList(chunkList, uploadFile.length(), null);
chunkHashes = new ArrayList<>(chunkList.size());
setDaemon(true);
setName("HashGenerator");
}
@Override
public synchronized void start() {
if (isCanceled) {
LOGGER.warn("Cannot start hashing if it has been cancelled before");
} else {
super.start();
}
}
public void setUploadToken(String token) {
if (!isCanceled && this.uploadToken == null) {
this.uploadToken = token;
submitHashes(false);
}
cleanupIfDone();
}
@Override
public void run() {
LOGGER.debug("Started hash reader worker");
try {
for (FileChunk chunk : chunkList) {
if (isCanceled) {
LOGGER.debug("Cancelled chunk reader (1)");
break;
}
Block block;
try {
byte[] buffer = null;
do {
try {
buffer = new byte[chunk.range.getLength()];
} catch (OutOfMemoryError e) {
LOGGER.info("Low memory - slowing down hashing");
Util.sleep(5000);
}
} while (buffer == null);
file.seek(chunk.range.startOffset);
file.readFully(buffer);
block = new Block(chunk, buffer);
} catch (IOException e) {
LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e);
block = new Block(chunk, new byte[0]);
}
if (isCanceled) {
LOGGER.debug("Cancelled chunk reader (2)");
break;
}
//
if (System.currentTimeMillis() > nextReadMsg) {
nextReadMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Read chunk " + chunk.getChunkIndex());
}
//
for (;;) {
if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) {
LOGGER.warn("Aborting current hash job - pool has shut down");
isCanceled = true;
Thread.currentThread().interrupt();
return;
}
try {
pendingHashes.incrementAndGet();
HASH_WORK_POOL.execute(block);
// Don't hash too furiously in the background if the upload didn't start yet
if (uploadToken == null && chunk.getChunkIndex() > 4) {
Util.sleep(200);
}
} catch (RejectedExecutionException e) {
pendingHashes.decrementAndGet();
LOGGER.warn("Hash pool worker rejected a hash job!? Retrying...");
Util.sleep(1000);
continue;
}
break;
}
}
} finally {
Util.safeClose(file);
readingDone = true;
cleanupIfDone();
}
}
public synchronized void cancel() {
LOGGER.debug("Cancelled externally");
isCanceled = true;
}
/**
* Worker for hashing chunk. Processed via thread pool.
*/
private class Block implements Runnable {
public final FileChunk chunk;
public byte[] buffer;
public Block(FileChunk chunk, byte[] buffer) {
this.chunk = chunk;
this.buffer = buffer;
}
@Override
public void run() {
MessageDigest digester = SHA1_DIGESTER.get();
digester.update(buffer, 0, chunk.range.getLength());
this.buffer = null; // Clear reference before calling function below
byte[] hash = digester.digest();
synchronized (this) {
if (isCanceled) {
pendingHashes.decrementAndGet();
return;
}
}
hashDone(chunk, hash);
}
}
/**
* Called by worker thread when a chunk has been hashed.
* This means this method is not running in the currentAsyncHashGenerator
* thread but one of the workers.
*
* @param chunk
* @param hash
*/
private void hashDone(FileChunk chunk, byte[] hash) {
int chunkIndex = chunk.getChunkIndex();
boolean wasLastChunk = false;
if (System.currentTimeMillis() > nextDoneMsg) {
nextDoneMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Done hashing chunk " + chunkIndex);
}
synchronized (chunkHashes) {
while (chunkHashes.size() < chunkIndex) {
chunkHashes.add(null);
}
if (chunkHashes.size() == chunkIndex) {
chunkHashes.add(ByteBuffer.wrap(hash));
} else {
chunkHashes.set(chunkIndex, ByteBuffer.wrap(hash));
}
if (chunkIndex == finishedChunks) {
while (finishedChunks < chunkHashes.size() && chunkHashes.get(finishedChunks) != null) {
finishedChunks++;
if (finishedChunks == chunkList.size()) {
wasLastChunk = true;
}
}
completeCount.set(finishedChunks);
}
if (chunkIndex + 1 == chunkList.size()) {
LOGGER.debug("Hashed last chunk #" + chunkIndex + ", total=" + chunkList.size() + ", finished=" + finishedChunks);
}
}
if (wasLastChunk) {
// Last chunk was hashed - make sure list gets to the server
// Try up to 10 times
LOGGER.debug("Hashing done");
for (int i = 0; i < 10; ++i) {
if (submitHashes(true)) {
LOGGER.debug("Hashes sent to server");
break;
}
LOGGER.debug("Sending hashes failed...");
if (!Util.sleep(2000))
break; // Interrupted
}
} else if (chunkIndex % 20 == 0) {
// Mid-hashing - update server side
if (!submitHashes(false)) {
LOGGER.warn("Server rejected partial block hash list");
isCanceled = true;
}
}
if (pendingHashes.decrementAndGet() == 0) {
cleanupIfDone();
}
}
/**
* Drop references to all the chunk metadata - in case someone is still holding
* a reference to this class, at least we will not prevent this stuff from being
* garbage collected.
*/
private synchronized void cleanupIfDone() {
if (!readingDone && isAlive())
return;
if (uploadToken == null && !isCanceled)
return;
if (pendingHashes.get() != 0)
return;
isCanceled = true;
chunkHashes.clear();
chunkList.clear();
LOGGER.debug("Hasher cleaned up");
}
/**
* @return true if this instance is not dong anything meaningful anymore
* and no reference to it needs to be kept around.
*/
public boolean canBeDiscarded() {
return isCanceled || (!isAlive() && pendingHashes.get() == 0);
}
/**
* Submit current list of hashes.
*
* @return false if the token is not known to the server
*/
private boolean submitHashes(boolean mustSucceed) {
if (isCanceled)
return true;
List<ByteBuffer> subList;
boolean d;
synchronized (chunkHashes) {
subList = new ArrayList<>( chunkHashes.subList(0, finishedChunks) );
d = (finishedChunks == chunkList.size());
}
if (!d) {
d = System.currentTimeMillis() > nextSendingMsg;
}
if (d) {
nextSendingMsg = System.currentTimeMillis() + 60000;
LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")");
}
if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list
return true;
try {
if (d) LOGGER.debug("Making updateBlockHashes call");
ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken());
if (d) LOGGER.debug("updateBlockHashes call succeeded");
} catch (TInvalidTokenException e) {
LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!");
isCanceled = true;
return false;
} catch (Exception e) {
LOGGER.warn("Unknown exception when submitting hashList to sat", e);
if (mustSucceed)
return false;
}
return true;
}
public AtomicInteger getCompleteCounter() {
return completeCount;
}
}