package org.openslx.dozmod.filetransfer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.List;
import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.dozmod.Config;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.Transfer;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.ChunkList;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.util.Util;
/**
* Execute file download in a background thread and update the progress.
*/
public class DownloadTask extends TransferTask {
/**
* Logger instance for this class.
*/
private final static Logger LOGGER = Logger.getLogger(DownloadTask.class);
private final String host;
private final int port;
private final String downloadToken;
private final RandomAccessFile fileHandle;
private final ChunkList chunks;
private final long startTime;
private boolean fileWritable = true;
public DownloadTask(String host, int port, String downloadToken, File destinationFile, long fileSize,
List<byte[]> sha1Sums) throws FileNotFoundException {
super(destinationFile, fileSize);
this.host = host;
this.port = port;
this.downloadToken = downloadToken;
this.fileHandle = new RandomAccessFile(destinationFile, "rw");
this.chunks = new ChunkList(fileSize, sha1Sums);
this.startTime = System.currentTimeMillis();
}
private class DownloadHandler implements WantRangeCallback, DataReceivedCallback {
private FileChunk current = null;
private byte[] buffer = null;
// progress counter
private long currentSpeed = 0;
private long currentBytes = 0;
private long lastUpdate = 0;
private long lastBytes = 0;
@Override
public FileRange get() {
handleCompletedChunk(current, buffer);
consecutiveInitFails.lazySet(0);
try {
current = chunks.getMissing();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
if (current == null)
return null;
buffer = new byte[current.range.getLength()];
return current.range;
}
@Override
public boolean dataReceived(final long fileOffset, final int dataLength, final byte[] data) {
if (current == null)
throw new IllegalStateException("dataReceived without current chunk");
if (!current.range.contains(fileOffset, fileOffset + dataLength))
throw new IllegalStateException("dataReceived with file data out of range");
System.arraycopy(data, 0, buffer, (int) (fileOffset - current.range.startOffset), dataLength);
currentBytes += dataLength;
final long now = System.currentTimeMillis();
if (lastUpdate + UPDATE_INTERVAL_MS < now) {
synchronized (this) {
// Calculate updated speed
lastBytes = (lastBytes * 2 + currentBytes) / 3;
currentSpeed = (1000 * lastBytes) / (now - lastUpdate);
lastUpdate = now;
}
// Reset counters
currentBytes = 0;
}
return fileWritable;
}
private long getCurrentSpeed() {
synchronized (this) {
return currentSpeed;
}
}
}
private void handleCompletedChunk(FileChunk chunk, byte[] buffer) {
if (chunk == null)
return;
// TODO: Hash check, async
try {
synchronized (fileHandle) {
fileHandle.seek(chunk.range.startOffset);
fileHandle.write(buffer, 0, chunk.range.getLength());
}
chunks.markCompleted(chunk, true);
} catch (Exception e) {
LOGGER.error("Could not write to file at offset " + chunk.range.startOffset, e);
fileWritable = false;
}
}
private class DownloadThread extends TransferThread {
private Downloader downloader = null;
private DownloadHandler cb = new DownloadHandler();
@Override
public void run() {
try {
downloader = new Downloader(host, port, Config.TRANSFER_TIMEOUT, null, downloadToken);
} catch (Exception e) {
LOGGER.warn("Could not initialize new uploader", e);
consecutiveInitFails.incrementAndGet();
connectFailed(this);
return;
} // TODO: SSL
connectSucceeded(this);
boolean ret = downloader.download(cb, cb);
if (!ret) {
consecutiveInitFails.incrementAndGet();
}
if (cb.current != null) {
chunks.markFailed(cb.current);
}
transferEnded(this, ret);
}
@Override
protected Transfer getTransfer() {
return downloader;
}
@Override
public long getCurrentSpeed() {
return cb.getCurrentSpeed();
}
}
@Override
protected void cleanup() {
Util.safeClose(fileHandle);
}
@Override
protected TransferEvent getTransferEvent() {
final TransferState state;
final byte[] progress = chunks.getStatusArray().array();
final String error;
if (consecutiveInitFails.get() > 20) {
state = TransferState.ERROR;
error = "Cannot talk to server after 20 tries...";
} else if (chunks.isComplete() && getTransferCount() == 0) {
Util.safeClose(fileHandle);
state = TransferState.FINISHED;
error = null;
} else {
state = TransferState.WORKING;
error = null;
}
long speed = 0;
long timeRemaining = 0;
long virtualSpeed = 0;
synchronized (transfers) {
for (TransferThread thread : transfers) {
speed += thread.getCurrentSpeed();
}
}
// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying
if (progress != null) {
int missing = 0;
for (byte b : progress) {
if (b != 0) {
missing++;
}
}
final long bytesRemaining = CHUNK_SIZE * (long) missing;
timeRemaining = (1000 * bytesRemaining) / (speed + 1);
virtualSpeed = ((progress.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1);
}
return new TransferEvent(state, progress, speed, virtualSpeed, timeRemaining, error);
}
@Override
protected TransferThread createNewThread() {
return new DownloadThread();
}
}