package org.openslx.dozmod.filetransfer; import java.io.File; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.dozmod.thrift.Session; import org.openslx.filetransfer.Transfer; import org.openslx.util.Util; public abstract class TransferTask implements Runnable, TransferEventEmitter { private static final Logger LOGGER = LogManager.getLogger(TransferTask.class); protected static final double BYTES_PER_MIB = 1024 * 1024; protected static final long CHUNK_SIZE = 16 * 1024 * 1024; /** * Update interval of transfer status (speed only) */ protected static final double UPDATE_INTERVAL_SECONDS = 0.6; protected static final int UPDATE_INTERVAL_MS = (int) (UPDATE_INTERVAL_SECONDS * 1000); protected final List transfers = new ArrayList<>(); private final List connectingTransfers = new ArrayList<>(); protected final AtomicInteger consecutiveInitFails = new AtomicInteger(); /** * List of listeners that want to get status updates about the transfer. */ private final List listeners = new ArrayList<>(); private volatile boolean isComplete = false; private volatile boolean isCancelled = false; private boolean endgame = false; private int minConnectionCount = 1; private long lastConnectionAttempt = 0; protected final File localFile; protected final long fileSize; protected TransferTask(File localFile, long fileSize) { this.localFile = localFile; this.fileSize = fileSize; } @Override public final void run() { while (!isCancelled && !Thread.interrupted()) { TransferEvent event = getTransferEvent(); if (event != null) { if (event.state == TransferState.FINISHED) { isComplete = true; } if (event.state == TransferState.ERROR) { isCancelled = true; } fireEvent(event); if (event.state == TransferState.ERROR || event.state == TransferState.FINISHED) { break; } } ensureActivity(); Util.sleep(UPDATE_INTERVAL_MS); } LOGGER.info("Transfer worker mainloop finished"); List joinList = new ArrayList<>(); synchronized (transfers) { isCancelled = true; joinList.addAll(transfers); joinList.addAll(connectingTransfers); } for (TransferThread t : joinList) { Transfer transfer = t.getTransfer(); if (transfer != null) { transfer.cancel(); } t.interrupt(); Util.joinThread(t); } cleanup(); LOGGER.info("Trasfer worker exiting"); } protected void cleanup() { // By default, this does nothing } public boolean isComplete() { return isComplete; } @Override public boolean isCanceled() { return isCancelled; } private void fireEvent(TransferEvent event) { if (event.errorMessage != null) { LOGGER.warn("(" + this.getClass().getSimpleName() + ") fireEvent with error: " + event.errorMessage); } synchronized (listeners) { for (int i = listeners.size() - 1; i >= 0; --i) { listeners.get(i).update(event); } } } protected void fireErrorMessage(String message) { TransferEvent event = new TransferEvent(null, null, 0, 0, 0, message); fireEvent(event); } protected abstract TransferEvent getTransferEvent(); public final void setMinConnections(int count) { if (Session.getSatelliteConfig() != null && Session.getSatelliteConfig().isSetMaxConnectionsPerTransfer() && Session.getSatelliteConfig().getMaxConnectionsPerTransfer() > 0 && Session.getSatelliteConfig().getMaxConnectionsPerTransfer() < count) { count = Session.getSatelliteConfig().getMaxConnectionsPerTransfer(); } synchronized (transfers) { this.minConnectionCount = count; } } @Override public void addListener(TransferEventListener listener) { synchronized (listeners) { listeners.add(listener); } } @Override public void removeListener(TransferEventListener listener) { synchronized (listeners) { while (listeners.remove(listener)) { } } } public void cancel() { final List joiners; synchronized (transfers) { if (isCancelled) return; isCancelled = true; joiners = new ArrayList<>(); joiners.addAll(transfers); joiners.addAll(connectingTransfers); } for (TransferThread t : joiners) { if (t.getTransfer() != null) { t.getTransfer().cancel(); } } } private final void ensureActivity() { synchronized (transfers) { if (isCancelled || isComplete) return; if (endgame && (!transfers.isEmpty() || !connectingTransfers.isEmpty())) return; Iterator it = transfers.iterator(); while (it.hasNext()) { if (!it.next().getTransfer().isValid()) { it.remove(); } } if (transfers.size() + connectingTransfers.size() >= minConnectionCount) return; long now = System.currentTimeMillis(); int fails = consecutiveInitFails.get(); if (lastConnectionAttempt + fails * 10000 > now) return; lastConnectionAttempt = now; TransferThread thread = createNewThread(); if (thread != null) { thread.setDaemon(true); connectingTransfers.add(thread); thread.start(); } } } protected abstract TransferThread createNewThread(); protected final void connectFailed(TransferThread thread) { synchronized (transfers) { connectingTransfers.remove(thread); LOGGER.info("Establishing new transfer connection failed, [a:" + transfers.size() + "/c:" + connectingTransfers.size() + "]"); } } protected final void connectSucceeded(TransferThread thread) { synchronized (transfers) { connectingTransfers.remove(thread); if (!isCancelled) { transfers.add(thread); LOGGER.info("Establishing new transfer connection succeeded, [a:" + transfers.size() + "/c:" + connectingTransfers.size() + "]"); return; } } thread.getTransfer().cancel(); thread.interrupt(); } protected final void transferEnded(TransferThread thread, boolean success) { synchronized (transfers) { transfers.remove(thread); LOGGER.info("A transfer connection has finished (success=" + success + "), [a:" + transfers.size() + "/c:" + connectingTransfers.size() + "]"); if (endgame && !success && transfers.isEmpty()) { // We had a transfer that reported success before, so we assume there are no more pending blocks // not being actively transfered already. Only trigger a new upload if this was the last active // transfer and it failed. This also resets endgame mode. LOGGER.debug("Disabled endgame mode"); endgame = false; } else if (!endgame && success) { LOGGER.debug("Enabled endgame mode"); endgame = true; } if (success && transfers.isEmpty()) { LOGGER.debug("Transfer might have finished, waiting for server"); lastConnectionAttempt = System.currentTimeMillis() + 3000; // Throttle reconnects return; // Skip ensureActivity check } } ensureActivity(); } /** * Get the number of consecutive connection fails. This counter is only * increased if there is no active transfer running, and is reset as soon as * one transfer successfully connects. * * @return connect fails */ public final int getFailCount() { return consecutiveInitFails.get(); } public int getTransferCount() { synchronized (transfers) { return transfers.size(); } } public File getFile() { return localFile; } protected abstract static class TransferThread extends Thread { @Override public abstract void run(); protected abstract Transfer getTransfer(); public abstract long getCurrentSpeed(); } }