package org.openslx.dozmod.filetransfer; import java.io.File; import java.io.FileNotFoundException; import java.io.RandomAccessFile; import java.util.List; import org.apache.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.dozmod.Config; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.Transfer; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.filetransfer.util.ChunkList; import org.openslx.filetransfer.util.FileChunk; import org.openslx.util.Util; /** * Execute file download in a background thread and update the progress. */ public class DownloadTask extends TransferTask { /** * Logger instance for this class. */ private final static Logger LOGGER = Logger.getLogger(DownloadTask.class); private final String host; private final int port; private final String downloadToken; private final RandomAccessFile fileHandle; private final ChunkList chunks; private final long startTime; private boolean fileWritable = true; public DownloadTask(String host, int port, String downloadToken, File destinationFile, long fileSize, List sha1Sums) throws FileNotFoundException { super(destinationFile, fileSize); this.host = host; this.port = port; this.downloadToken = downloadToken; this.fileHandle = new RandomAccessFile(destinationFile, "rw"); this.chunks = new ChunkList(fileSize, sha1Sums); this.startTime = System.currentTimeMillis(); } private class DownloadHandler implements WantRangeCallback, DataReceivedCallback { private FileChunk current = null; private byte[] buffer = null; // progress counter private long currentSpeed = 0; private long currentBytes = 0; private long lastUpdate = 0; private long lastBytes = 0; @Override public FileRange get() { handleCompletedChunk(current, buffer); consecutiveInitFails.lazySet(0); try { current = chunks.getMissing(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } if (current == null) return null; buffer = new byte[current.range.getLength()]; return current.range; } @Override public boolean dataReceived(final long fileOffset, final int dataLength, final byte[] data) { if (current == null) throw new IllegalStateException("dataReceived without current chunk"); if (!current.range.contains(fileOffset, fileOffset + dataLength)) throw new IllegalStateException("dataReceived with file data out of range"); System.arraycopy(data, 0, buffer, (int) (fileOffset - current.range.startOffset), dataLength); currentBytes += dataLength; final long now = System.currentTimeMillis(); if (lastUpdate + UPDATE_INTERVAL_MS < now) { synchronized (this) { // Calculate updated speed lastBytes = (lastBytes * 2 + currentBytes) / 3; currentSpeed = (1000 * lastBytes) / (now - lastUpdate); lastUpdate = now; } // Reset counters currentBytes = 0; } return fileWritable; } private long getCurrentSpeed() { synchronized (this) { return currentSpeed; } } } private void handleCompletedChunk(FileChunk chunk, byte[] buffer) { if (chunk == null) return; // TODO: Hash check, async try { synchronized (fileHandle) { fileHandle.seek(chunk.range.startOffset); fileHandle.write(buffer, 0, chunk.range.getLength()); } chunks.markCompleted(chunk, true); } catch (Exception e) { LOGGER.error("Could not write to file at offset " + chunk.range.startOffset, e); fileWritable = false; } } private class DownloadThread extends TransferThread { private Downloader downloader = null; private DownloadHandler cb = new DownloadHandler(); @Override public void run() { try { downloader = new Downloader(host, port, Config.TRANSFER_TIMEOUT, null, downloadToken); } catch (Exception e) { LOGGER.warn("Could not initialize new uploader", e); consecutiveInitFails.incrementAndGet(); connectFailed(this); return; } // TODO: SSL connectSucceeded(this); boolean ret = downloader.download(cb, cb); if (!ret) { consecutiveInitFails.incrementAndGet(); } if (cb.current != null) { chunks.markFailed(cb.current); } transferEnded(this, ret); } @Override protected Transfer getTransfer() { return downloader; } @Override public long getCurrentSpeed() { return cb.getCurrentSpeed(); } } @Override protected void cleanup() { Util.safeClose(fileHandle); } @Override protected TransferEvent getTransferEvent() { final TransferState state; final byte[] progress = chunks.getStatusArray().array(); final String error; if (consecutiveInitFails.get() > 20) { state = TransferState.ERROR; error = "Cannot talk to server after 20 tries..."; } else if (chunks.isComplete() && getTransferCount() == 0) { Util.safeClose(fileHandle); state = TransferState.FINISHED; error = null; } else { state = TransferState.WORKING; error = null; } long speed = 0; long timeRemaining = 0; long virtualSpeed = 0; synchronized (transfers) { for (TransferThread thread : transfers) { speed += thread.getCurrentSpeed(); } } // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying if (progress != null) { int missing = 0; for (byte b : progress) { if (b != 0) { missing++; } } final long bytesRemaining = CHUNK_SIZE * (long) missing; timeRemaining = (1000 * bytesRemaining) / (speed + 1); virtualSpeed = ((progress.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1); } return new TransferEvent(state, progress, speed, virtualSpeed, timeRemaining, error); } @Override protected TransferThread createNewThread() { return new DownloadThread(); } }