|
|
package org.openslx.dozmod.thrift;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.SatelliteStatus;
import org.openslx.bwlp.thrift.iface.TAuthorizationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.dozmod.Config;
import org.openslx.dozmod.filetransfer.AsyncHashGenerator;
import org.openslx.dozmod.filetransfer.UploadTask;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.QuickTimer;
public class UploadInitiator {
private static final Logger LOGGER = LogManager.getLogger(UploadInitiator.class);
private static final long SIZE_CHECK_EXTRA_UL = 150l * 1024l * 1024l;
public enum UploadInitState {
IDLE,
REQUESTING,
WAITING_FOR_SLOT,
UPLOAD_STARTING,
UPLOAD_STARTED,
ERROR
}
private final File diskFile;
private final long fileSize;
private final String imageBaseId;
private final ByteBuffer machineDescription;
private final AsyncHashGenerator hashGen;
private TransferInformation transferInformation = null;
private UploadTask uploadTask;
private UploadInitState initState = UploadInitState.IDLE;
private String errorMessage = null;
/**
* GUI-BLOCKING Request the upload of an image version. Returns the
* TransferInformation received by the server or null if the request failed.
* Will give user feedback about failures.
*
* @param imageBaseId uuid of the image to upload a version of
* @param diskFile the file to upload
* @param machineDescription
* @throws TException
* @throws IOException
*/
public UploadInitiator(final String imageBaseId, final File diskFile, final ByteBuffer machineDescription)
throws WrappedException, IOException {
if (!diskFile.canRead())
throw new FileNotFoundException(diskFile.getName());
this.fileSize = diskFile.length();
SatelliteStatus status;
try {
status = ThriftManager.getSatClient().getStatus();
} catch (TException e1) {
throw new WrappedException(e1, "Konnte Status des Satelliten nicht abfragen!");
}
if (status.getAvailableStorageBytes() != -1
&& status.getAvailableStorageBytes() < fileSize + SIZE_CHECK_EXTRA_UL) {
throw new IOException("Nicht genügend Speicherplatz auf dem Satelliten.\n"
+ "Löschen Sie nicht verwendete VMs oder kontaktieren Sie den Satelliten-Administrator.");
}
this.diskFile = diskFile;
AsyncHashGenerator hg;
try {
hg = new AsyncHashGenerator(diskFile);
} catch (NoSuchAlgorithmException e) {
LOGGER.warn("Cannot instantiate hash generator: No error correction available!");
hg = null;
}
this.hashGen = hg;
this.machineDescription = machineDescription;
this.imageBaseId = imageBaseId;
}
public synchronized void startHashing() throws IllegalThreadStateException {
if (hashGen == null)
return;
hashGen.start();
}
public interface GotUploadTokenCallback {
public void fire();
}
GotUploadTokenCallback gotTokenCallback = null;
public synchronized void startUpload(GotUploadTokenCallback gotTokenCallback) {
if (initState != UploadInitState.IDLE)
throw new IllegalStateException("Upload already started");
this.gotTokenCallback = gotTokenCallback;
initState = UploadInitState.REQUESTING;
QuickTimer.scheduleAtFixedRate(startUploadInternal, 1, TimeUnit.SECONDS.toMillis(15));
}
private QuickTimer.Task startUploadInternal = new QuickTimer.Task() {
@Override
public void fire() {
LOGGER.debug("start upload internal");
try {
transferInformation = ThriftManager.getSatClient().requestImageVersionUpload(
Session.getSatelliteToken(), imageBaseId, fileSize, null, // TODO remove deprecated parameter
machineDescription);
} catch (TAuthorizationException e) {
cancelWithGuiErrorMessage("Upload vom Server verweigert");
this.cancel();
return;
} catch (TTransferRejectedException e) {
if (e.message != null && e.message.startsWith("Server busy")) {
initState = UploadInitState.WAITING_FOR_SLOT;
} else {
cancelWithGuiErrorMessage("Upload-Anfrage gescheitert (" + e.message + ")");
this.cancel();
}
return;
} catch (TException e) {
cancelWithGuiErrorMessage("Upload-Anfrage gescheitert!");
this.cancel();
return;
}
// Everything worked out so far
LOGGER.info("Version upload granted, versionId: " + transferInformation.toString());
if (gotTokenCallback != null) {
gotTokenCallback.fire();
}
if (hashGen != null) {
hashGen.setUploadToken(transferInformation.token);
}
initState = UploadInitState.UPLOAD_STARTING;
QuickTimer.scheduleAtFixedRate(launchUploadTask, 1, 100);
this.cancel();
}
};
/**
* Prepare the given file for uploading
*
* @param transferInformation transfer information to use for the upload
* @param hashGen hash generator for this file
* @param diskFile the file to upload
* @return UploadTask if the uploading initialized, or null if uploading
* failed
*/
private QuickTimer.Task launchUploadTask = new QuickTimer.Task() {
@Override
public void fire() {
LOGGER.debug("launch upload task");
if (initState != UploadInitState.UPLOAD_STARTING) {
LOGGER.info("Huh. Upload state is " + initState + ". Stopping init watchdog.");
this.cancel();
return;
}
if (uploadTask == null) {
// do actually start the upload now
LOGGER.debug("Starting upload for: " + diskFile.getName());
try {
uploadTask = new UploadTask(Session.getSatelliteAddress(), transferInformation,
ThriftManager.getSatelliteSslContext(), diskFile);
} catch (FileNotFoundException e) {
cancelWithGuiErrorMessage("Kann VM nicht hochladen: Datei nicht gefunden\n\n"
+ diskFile.getAbsolutePath());
this.cancel();
return;
}
uploadTask.setMinConnections(Config.getTransferConnectionCount());
if (hashGen != null) {
uploadTask.setHashCounter(hashGen.getCompleteCounter());
}
Thread uploadThread = new Thread(uploadTask, "UploadTask");
uploadThread.setDaemon(true);
uploadThread.start();
}
if (uploadTask.getFailCount() == 0 && uploadTask.getTransferCount() == 0
&& !uploadTask.isCanceled()) {
// Still initializing, wait...
return;
}
if (!uploadTask.isComplete() && uploadTask.getTransferCount() == 0) {
// Init failed
cancelWithGuiErrorMessage("Aufbau der Verbindung zum Hochladen fehlgeschlagen");
this.cancel();
return;
}
// Init succeeded
initState = UploadInitState.UPLOAD_STARTED;
LOGGER.info("Upload initiated");
this.cancel();
}
};
/**
* Set state to cancelled and clean up. Given reason goes to log only.
*/
public synchronized void cancelError(String reason) {
if (initState == UploadInitState.ERROR)
return;
LOGGER.info("Upload was cancelled while in state " + initState + ": " + reason);
initState = UploadInitState.ERROR;
if (hashGen != null) {
hashGen.cancel();
}
if (uploadTask != null) {
uploadTask.cancel();
}
if (transferInformation != null) {
try {
ThriftManager.getSatClient().cancelUpload(transferInformation.token);
} catch (Exception e) {
}
}
}
public UploadInitState getState() {
return initState;
}
public String getErrorMessage() {
return errorMessage;
}
/**
* Cancel upload, show given error message in GUI transfer activity.
* @param errorMessage
*/
public void cancelWithGuiErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
cancelError("cancelWithErrorMessage: " + errorMessage);
}
public AsyncHashGenerator getHasher() {
return hashGen;
}
public UploadTask getUploadTask() {
return uploadTask;
}
public String getToken() {
return transferInformation == null ? null : transferInformation.token;
}
}
|