package org.openslx.dozmod.filetransfer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
import org.openslx.util.Util;
public class AsyncHashGenerator extends Thread {
private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);
private static final ThreadPoolExecutor pool = new GrowingThreadPoolExecutor(1, Runtime.getRuntime()
.availableProcessors(), 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(4),
new PrioThreadFactory("HashGen"));
private static final ThreadLocal<MessageDigest> sha1 = new ThreadLocal<MessageDigest>() {
@Override
protected MessageDigest initialValue() {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
LOGGER.warn("No SHA-1 MD available. Cannot hash file", e);
return null;
}
}
};
private String uploadToken = null;
private int finishedBlocks = 0;
private final RandomAccessFile file;
private final List<ByteBuffer> blockHashes;
private final List<FileChunk> list;
private volatile boolean isCanceled = false;
public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException {
try {
file = new RandomAccessFile(uploadFile, "r");
} catch (FileNotFoundException e) {
LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e);
throw e;
}
list = new ArrayList<>();
FileChunk.createChunkList(list, uploadFile.length(), null);
blockHashes = new ArrayList<>(list.size());
setDaemon(true);
setName("HashGenerator");
}
public void setUploadToken(String token) {
if (!isCanceled && this.uploadToken == null) {
this.uploadToken = token;
submitHashes(false);
}
}
@Override
public void run() {
try {
for (FileChunk chunk : list) {
if (isCanceled) {
LOGGER.debug("Cancelled chunk reader (1)");
break;
}
Block block;
try {
byte[] buffer = new byte[chunk.range.getLength()];
file.seek(chunk.range.startOffset);
file.readFully(buffer);
block = new Block(chunk, buffer);
} catch (IOException e) {
LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e);
block = new Block(chunk, new byte[0]);
}
if (isCanceled) {
LOGGER.debug("Cancelled chunk reader (2)");
break;
}
for (;;) {
if (pool.isTerminating() || pool.isTerminated()) {
Thread.currentThread().interrupt();
return;
}
try {
pool.execute(block);
// Don't hash too furiously in the background if the upload didn't start yet
if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) {
Util.sleep(200);
}
} catch (RejectedExecutionException e) {
Util.sleep(100);
continue;
}
break;
}
}
} finally {
Util.safeClose(file);
}
}
public void cancel() {
LOGGER.debug("Cancelled externally");
isCanceled = true;
}
private class Block implements Runnable {
public final FileChunk chunk;
public final byte[] buffer;
public Block(FileChunk chunk, byte[] buffer) {
this.chunk = chunk;
this.buffer = buffer;
}
@Override
public void run() {
MessageDigest digester = sha1.get();
digester.update(buffer, 0, chunk.range.getLength());
byte[] hash = digester.digest();
hashDone(chunk, hash);
}
}
/**
* Called by worker thread when a block has been hashed.
* This means this method is not running in the currentAsyncHashGenerator
* thread but one of the workers.
*
* @param chunk
* @param hash
*/
private void hashDone(FileChunk chunk, byte[] hash) {
int blockIndex = chunk.getChunkIndex();
synchronized (blockHashes) {
while (blockHashes.size() < blockIndex) {
blockHashes.add(null);
}
if (blockHashes.size() == blockIndex) {
blockHashes.add(ByteBuffer.wrap(hash));
} else {
blockHashes.set(blockIndex, ByteBuffer.wrap(hash));
}
if (blockIndex == finishedBlocks) {
while (finishedBlocks < blockHashes.size() && blockHashes.get(finishedBlocks) != null) {
finishedBlocks++;
}
}
}
if (blockIndex % 20 == 0 || finishedBlocks == list.size()) {
if (blockIndex + 1 == list.size()) {
// Last block was hashed - make sure list gets to the server
LOGGER.debug("Hashing done");
for (int i = 0; i < 10; ++i) {
if (submitHashes(true)) {
LOGGER.debug("Hashes sent to server");
break;
}
LOGGER.debug("Sending hashes failed...");
try {
Thread.sleep(2000);
continue;
} catch (InterruptedException e) {
interrupt();
return;
}
}
return;
}
// Mid-hashing - update server side
QuickTimer.scheduleOnce(new Task() {
@Override
public void fire() {
if (!submitHashes(false)) {
LOGGER.warn("Server rejected block hash list");
isCanceled = true;
}
}
});
}
}
/**
* Submit current list of hashes.
*
* @return false if the token is not known to the server
*/
private boolean submitHashes(boolean mustSucceed) {
List<ByteBuffer> subList;
synchronized (blockHashes) {
subList = new ArrayList<>( blockHashes.subList(0, finishedBlocks) );
}
if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list
return true;
try {
ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken());
} catch (TInvalidTokenException e) {
LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!");
isCanceled = true;
return false;
} catch (TException e) {
LOGGER.warn("Unknown exception when submitting hashList to sat", e);
if (mustSucceed)
return false;
}
return true;
}
}