package org.openslx.dozmod.filetransfer;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.dozmod.Config;
import org.openslx.filetransfer.Transfer;
import org.openslx.filetransfer.UploadStatusCallback;
import org.openslx.filetransfer.Uploader;
import org.openslx.thrifthelper.ThriftManager;
/**
* Executes the file upload in a background thread and updates progress to
* listeners.
*/
public class UploadTask extends TransferTask {
/**
* Logger instance for this class.
*/
private final static Logger LOGGER = LogManager.getLogger(UploadTask.class);
private static final AtomicInteger THREAD_ID = new AtomicInteger();
/**
* Update interval of the block progress (needs thrift call to sat)
*/
private static final double THRIFT_INTERVAL_SECONDS = 1.7;
private static final int THRIFT_INTERVAL_MS = (int) (THRIFT_INTERVAL_SECONDS * 1000);
private final String host;
private final int port;
private final String uploadToken;
private final long startTime;
private String transferConnectionError = null;
/**
* Keep track of the number of active upload connections
*/
private static AtomicInteger numConnections = new AtomicInteger();
/**
* Get the number of active upload workers. This counts individual
* upload connections, not logical uploads which might use more than
* one connection at a time.
*/
public static int getNumberOfUploads() {
return numConnections.get();
}
public UploadTask(String host, int port, String uploadToken, File uploadFile)
throws FileNotFoundException {
super(uploadFile, uploadFile.length());
if (!uploadFile.canRead())
throw new FileNotFoundException();
// TODO: SSL
this.host = host;
this.port = port;
this.uploadToken = uploadToken;
this.startTime = System.currentTimeMillis();
}
private class UploadThread extends TransferThread {
public UploadThread() {
super("UpConn#" + THREAD_ID.incrementAndGet());
}
// private long totalBytesRead = 0;
private long currentSpeed = 0;
private Uploader uploader = null;
@Override
public void run() {
numConnections.incrementAndGet();
try {
run2();
} finally {
numConnections.decrementAndGet();
}
}
public void run2() {
try {
uploader = new Uploader(host, port, Config.TRANSFER_TIMEOUT, null, uploadToken);
} catch (Exception e) {
LOGGER.warn("Could not initialize new uploader", e);
consecutiveInitFails.incrementAndGet();
connectFailed(this);
return;
} // TODO: SSL
connectSucceeded(this);
final UploadThread thread = this;
final boolean ret = uploader.upload(localFile.getAbsolutePath(), new UploadStatusCallback() {
// progress counter
private long currentBytes = 0;
private long lastUpdate = 0;
private long lastBytes = 0;
@Override
public void uploadProgress(long bytesSent) {
currentBytes += bytesSent;
final long now = System.currentTimeMillis();
if (lastUpdate + UPDATE_INTERVAL_MS < now) {
synchronized (thread) {
// Calculate updated speed
// totalBytesRead += currentBytes;
lastBytes = (lastBytes * 2 + currentBytes) / 3;
currentSpeed = (1000 * lastBytes) / (now - lastUpdate);
lastUpdate = now;
}
// Reset counters
currentBytes = 0;
}
}
@Override
public void uploadError(String message) {
fireErrorMessage(message);
}
});
if (ret) {
transferConnectionError = null;
consecutiveInitFails.set(0);
} else {
String err = uploader.getRemoteError();
if (err != null && !err.equals(transferConnectionError)) {
LOGGER.warn("Upload task remote error: " + err);
transferConnectionError = err;
}
consecutiveInitFails.incrementAndGet();
}
transferEnded(this, ret);
}
@Override
public long getCurrentSpeed() {
synchronized (this) {
return currentSpeed;
}
}
@Override
protected Transfer getTransfer() {
return uploader;
}
}
private long lastThriftUpdate = 0;
private long virtualSpeed = 0;
private long nextQueryDebug;
@Override
protected TransferEvent getTransferEvent() {
final long now = System.currentTimeMillis();
TransferState state = null;
byte[] blocks = null;
String error = null;
if (lastThriftUpdate + THRIFT_INTERVAL_MS < now) {
lastThriftUpdate = now;
try {
if (System.currentTimeMillis() > nextQueryDebug) {
nextQueryDebug = System.currentTimeMillis() + 30000;
LOGGER.debug("Querying upload status...");
}
TransferStatus uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken);
state = uploadStatus.getState();
blocks = uploadStatus.getBlockStatus();
} catch (TInvalidTokenException e) {
error = "Upload token unknown!?";
state = TransferState.ERROR;
LOGGER.warn("Cannot query upload status: Token not known by the server");
} catch (Exception e) {
error = "Exception quering upload status: " + e.toString();
LOGGER.warn("Cannot query upload status", e);
}
}
long speed = 0;
long timeRemaining = 0;
synchronized (transfers) {
for (TransferThread thread : transfers) {
speed += thread.getCurrentSpeed();
}
}
if (speed != 0) {
// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing
if (blocks != null) {
int missing = 0;
for (byte b : blocks) {
if (b != 0) {
missing++;
}
}
final long bytesRemaining = CHUNK_SIZE * (long) missing;
timeRemaining = (1000 * bytesRemaining) / (speed + 1);
virtualSpeed = ((blocks.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1);
}
}
if (transferConnectionError != null && (error == null || transferConnectionError.equals("Out of disk space"))) {
error = transferConnectionError;
}
TransferEvent event = new TransferEvent(state, blocks, speed, virtualSpeed, timeRemaining, error);
return event;
}
@Override
protected TransferThread createNewThread() {
return new UploadThread();
}
}