blob: cca429298ce87c8afd15335a560452ff5b869511 (
plain) (
tree)
|
|
package org.openslx.dozmod.filetransfer;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.Transfer;
import org.openslx.util.Util;
public abstract class TransferTask implements Runnable, TransferEventEmitter {
private static final Logger LOGGER = LogManager.getLogger(TransferTask.class);
protected static final double BYTES_PER_MIB = 1024 * 1024;
protected static final long CHUNK_SIZE = 16 * 1024 * 1024;
/**
* Update interval of transfer status (speed only)
*/
protected static final double UPDATE_INTERVAL_SECONDS = 0.6;
protected static final int UPDATE_INTERVAL_MS = (int) (UPDATE_INTERVAL_SECONDS * 1000);
protected final List<TransferThread> transfers = new ArrayList<>();
private final List<TransferThread> connectingTransfers = new ArrayList<>();
protected final AtomicInteger consecutiveInitFails = new AtomicInteger();
/**
* List of listeners that want to get status updates about the transfer.
*/
private final List<TransferEventListener> listeners = new ArrayList<>();
private volatile boolean isComplete = false;
private volatile boolean isCancelled = false;
private boolean endgame = false;
private int minConnectionCount = 1;
private long lastConnectionAttempt = 0;
protected final File localFile;
protected final long fileSize;
protected TransferTask(File localFile, long fileSize) {
this.localFile = localFile;
this.fileSize = fileSize;
}
@Override
public final void run() {
while (!isCancelled && !Thread.interrupted()) {
TransferEvent event = getTransferEvent();
if (event != null) {
if (event.state == TransferState.FINISHED) {
isComplete = true;
}
if (event.state == TransferState.ERROR) {
isCancelled = true;
}
fireEvent(event);
if (event.state == TransferState.ERROR || event.state == TransferState.FINISHED) {
break;
}
}
ensureActivity();
Util.sleep(UPDATE_INTERVAL_MS);
}
LOGGER.info("Transfer worker mainloop finished");
List<TransferThread> joinList = new ArrayList<>();
synchronized (transfers) {
isCancelled = true;
joinList.addAll(transfers);
joinList.addAll(connectingTransfers);
}
for (TransferThread t : joinList) {
Transfer transfer = t.getTransfer();
if (transfer != null) {
transfer.cancel();
}
t.interrupt();
Util.joinThread(t);
}
cleanup();
LOGGER.info("Trasfer worker exiting");
}
protected void cleanup() {
// By default, this does nothing
}
public boolean isComplete() {
return isComplete;
}
@Override
public boolean isCanceled() {
return isCancelled;
}
private void fireEvent(TransferEvent event) {
if (event.errorMessage != null) {
LOGGER.warn("(" + this.getClass().getSimpleName() + ") fireEvent with error: " + event.errorMessage);
}
synchronized (listeners) {
for (int i = listeners.size() - 1; i >= 0; --i) {
listeners.get(i).update(event);
}
}
}
protected void fireErrorMessage(String message) {
TransferEvent event = new TransferEvent(null, null, 0, 0, 0, message);
fireEvent(event);
}
protected abstract TransferEvent getTransferEvent();
public final void setMinConnections(int count) {
if (Session.getSatelliteConfig() != null
&& Session.getSatelliteConfig().isSetMaxConnectionsPerTransfer()
&& Session.getSatelliteConfig().getMaxConnectionsPerTransfer() > 0
&& Session.getSatelliteConfig().getMaxConnectionsPerTransfer() < count) {
count = Session.getSatelliteConfig().getMaxConnectionsPerTransfer();
}
synchronized (transfers) {
this.minConnectionCount = count;
}
}
@Override
public void addListener(TransferEventListener listener) {
synchronized (listeners) {
listeners.add(listener);
}
}
@Override
public void removeListener(TransferEventListener listener) {
synchronized (listeners) {
while (listeners.remove(listener)) {
}
}
}
public void cancel() {
final List<TransferThread> joiners;
synchronized (transfers) {
if (isCancelled)
return;
isCancelled = true;
joiners = new ArrayList<>();
joiners.addAll(transfers);
joiners.addAll(connectingTransfers);
}
for (TransferThread t : joiners) {
if (t.getTransfer() != null) {
t.getTransfer().cancel();
}
}
}
private final void ensureActivity() {
synchronized (transfers) {
if (isCancelled || isComplete)
return;
if (endgame && (!transfers.isEmpty() || !connectingTransfers.isEmpty()))
return;
Iterator<TransferThread> it = transfers.iterator();
while (it.hasNext()) {
if (!it.next().getTransfer().isValid()) {
it.remove();
}
}
if (transfers.size() + connectingTransfers.size() >= minConnectionCount)
return;
long now = System.currentTimeMillis();
int fails = consecutiveInitFails.get();
if (lastConnectionAttempt + fails * 10000 > now)
return;
lastConnectionAttempt = now;
TransferThread thread = createNewThread();
if (thread != null) {
thread.setDaemon(true);
connectingTransfers.add(thread);
thread.start();
}
}
}
protected abstract TransferThread createNewThread();
protected final void connectFailed(TransferThread thread) {
synchronized (transfers) {
connectingTransfers.remove(thread);
LOGGER.info("Establishing new transfer connection failed, [a:" + transfers.size() + "/c:"
+ connectingTransfers.size() + "]");
}
}
protected final void connectSucceeded(TransferThread thread) {
synchronized (transfers) {
connectingTransfers.remove(thread);
if (!isCancelled) {
transfers.add(thread);
LOGGER.info("Establishing new transfer connection succeeded, [a:" + transfers.size() + "/c:"
+ connectingTransfers.size() + "]");
return;
}
}
thread.getTransfer().cancel();
thread.interrupt();
}
protected final void transferEnded(TransferThread thread, boolean success) {
synchronized (transfers) {
transfers.remove(thread);
LOGGER.info("A transfer connection has finished (success=" + success + "), [a:" + transfers.size() + "/c:"
+ connectingTransfers.size() + "]");
if (endgame && !success && transfers.isEmpty()) {
// We had a transfer that reported success before, so we assume there are no more pending blocks
// not being actively transfered already. Only trigger a new upload if this was the last active
// transfer and it failed. This also resets endgame mode.
LOGGER.debug("Disabled endgame mode");
endgame = false;
} else if (!endgame && success) {
LOGGER.debug("Enabled endgame mode");
endgame = true;
}
if (success && transfers.isEmpty()) {
LOGGER.debug("Transfer might have finished, waiting for server");
lastConnectionAttempt = System.currentTimeMillis() + 3000; // Throttle reconnects
return; // Skip ensureActivity check
}
}
ensureActivity();
}
/**
* Get the number of consecutive connection fails. This counter is only
* increased if there is no active transfer running, and is reset as soon as
* one transfer successfully connects.
*
* @return connect fails
*/
public final int getFailCount() {
return consecutiveInitFails.get();
}
public int getTransferCount() {
synchronized (transfers) {
return transfers.size();
}
}
public File getFile() {
return localFile;
}
protected abstract static class TransferThread extends Thread {
@Override
public abstract void run();
protected abstract Transfer getTransfer();
public abstract long getCurrentSpeed();
}
}
|