package org.openslx.dozmod.filetransfer; import java.io.File; import java.io.FileNotFoundException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.dozmod.Config; import org.openslx.filetransfer.Transfer; import org.openslx.filetransfer.UploadStatusCallback; import org.openslx.filetransfer.Uploader; import org.openslx.thrifthelper.ThriftManager; /** * Executes the file upload in a background thread and updates progress to * listeners. */ public class UploadTask extends TransferTask { /** * Logger instance for this class. */ private final static Logger LOGGER = LogManager.getLogger(UploadTask.class); /** * Update interval of the block progress (needs thrift call to sat) */ private static final double THRIFT_INTERVAL_SECONDS = 1.7; private static final int THRIFT_INTERVAL_MS = (int) (THRIFT_INTERVAL_SECONDS * 1000); private final String host; private final int port; private final String uploadToken; private final long startTime; private String remoteError = null; /** * Keep track of the number of active upload connections */ private static AtomicInteger numConnections = new AtomicInteger(); /** * Get the number of active upload workers. This counts individual * upload connections, not logical uploads which might use more than * one connection at a time. */ public static int getNumberOfUploads() { return numConnections.get(); } public UploadTask(String host, int port, String uploadToken, File uploadFile) throws FileNotFoundException { super(uploadFile, uploadFile.length()); if (!uploadFile.canRead()) throw new FileNotFoundException(); // TODO: SSL this.host = host; this.port = port; this.uploadToken = uploadToken; this.startTime = System.currentTimeMillis(); } private class UploadThread extends TransferThread { // private long totalBytesRead = 0; private long currentSpeed = 0; private Uploader uploader = null; @Override public void run() { numConnections.incrementAndGet(); try { run2(); } finally { numConnections.decrementAndGet(); } } public void run2() { try { uploader = new Uploader(host, port, Config.TRANSFER_TIMEOUT, null, uploadToken); } catch (Exception e) { LOGGER.warn("Could not initialize new uploader", e); consecutiveInitFails.incrementAndGet(); connectFailed(this); return; } // TODO: SSL connectSucceeded(this); final UploadThread thread = this; final boolean ret = uploader.upload(localFile.getAbsolutePath(), new UploadStatusCallback() { // progress counter private long currentBytes = 0; private long lastUpdate = 0; private long lastBytes = 0; @Override public void uploadProgress(long bytesSent) { currentBytes += bytesSent; final long now = System.currentTimeMillis(); if (lastUpdate + UPDATE_INTERVAL_MS < now) { synchronized (thread) { // Calculate updated speed // totalBytesRead += currentBytes; lastBytes = (lastBytes * 2 + currentBytes) / 3; currentSpeed = (1000 * lastBytes) / (now - lastUpdate); lastUpdate = now; } // Reset counters currentBytes = 0; } } @Override public void uploadError(String message) { fireErrorMessage(message); } }); if (ret) { remoteError = null; consecutiveInitFails.set(0); } else { String err = uploader.getRemoteError(); if (err != null && !err.equals(remoteError)) { LOGGER.warn("Upload task remote error: " + err); remoteError = err; } consecutiveInitFails.incrementAndGet(); } transferEnded(this, ret); } @Override public long getCurrentSpeed() { synchronized (this) { return currentSpeed; } } @Override protected Transfer getTransfer() { return uploader; } } private long lastThriftUpdate = 0; private long virtualSpeed = 0; @Override protected TransferEvent getTransferEvent() { final long now = System.currentTimeMillis(); TransferState state = null; byte[] blocks = null; String error = null; if (lastThriftUpdate + THRIFT_INTERVAL_MS < now) { lastThriftUpdate = now; try { TransferStatus uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken); state = uploadStatus.getState(); blocks = uploadStatus.getBlockStatus(); } catch (TInvalidTokenException e) { error = "Upload token unknown!?"; state = TransferState.ERROR; } catch (TException e) { error = "Exception quering upload status: " + e.toString(); } } long speed = 0; long timeRemaining = 0; synchronized (transfers) { for (TransferThread thread : transfers) { speed += thread.getCurrentSpeed(); } } if (speed != 0) { // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing if (blocks != null) { int missing = 0; for (byte b : blocks) { if (b != 0) { missing++; } } final long bytesRemaining = CHUNK_SIZE * (long) missing; timeRemaining = (1000 * bytesRemaining) / (speed + 1); virtualSpeed = ((blocks.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1); } } if (remoteError != null && (error == null || remoteError.equals("Out of disk space"))) { error = remoteError; } TransferEvent event = new TransferEvent(state, blocks, speed, virtualSpeed, timeRemaining, error); return event; } @Override protected TransferThread createNewThread() { return new UploadThread(); } }