package org.openslx.dozmod.thrift; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.openslx.bwlp.thrift.iface.SatelliteStatus; import org.openslx.bwlp.thrift.iface.TAuthorizationException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.dozmod.Config; import org.openslx.dozmod.filetransfer.AsyncHashGenerator; import org.openslx.dozmod.filetransfer.UploadTask; import org.openslx.thrifthelper.ThriftManager; import org.openslx.util.QuickTimer; public class UploadInitiator { private static final Logger LOGGER = Logger.getLogger(UploadInitiator.class); private static final long SIZE_CHECK_EXTRA_UL = 150l * 1024l * 1024l; public enum UploadInitState { IDLE, REQUESTING, WAITING_FOR_SLOT, UPLOAD_STARTING, UPLOAD_STARTED, ERROR } private final File diskFile; private final long fileSize; private final String imageBaseId; private final ByteBuffer machineDescription; private final AsyncHashGenerator hashGen; private TransferInformation transferInformation = null; private UploadTask uploadTask; private UploadInitState initState = UploadInitState.IDLE; private String errorMessage = null; /** * GUI-BLOCKING Request the upload of an image version. Returns the * TransferInformation received by the server or null if the request failed. * Will give user feedback about failures. * * @param imageBaseId uuid of the image to upload a version of * @param diskFile the file to upload * @param machineDescription * @throws TException * @throws IOException */ public UploadInitiator(final String imageBaseId, final File diskFile, final ByteBuffer machineDescription) throws WrappedException, IOException { if (!diskFile.canRead()) throw new FileNotFoundException(diskFile.getName()); this.fileSize = diskFile.length(); SatelliteStatus status; try { status = ThriftManager.getSatClient().getStatus(); } catch (TException e1) { throw new WrappedException(e1, "Konnte Status des Satelliten nicht abfragen!"); } if (status.getAvailableStorageBytes() != -1 && status.getAvailableStorageBytes() < fileSize + SIZE_CHECK_EXTRA_UL) { throw new IOException("Nicht genügend Speicherplatz auf dem Satelliten.\n" + "Löschen Sie nicht verwendete VMs oder kontaktieren Sie den Satelliten-Administrator."); } this.diskFile = diskFile; AsyncHashGenerator hg; try { hg = new AsyncHashGenerator(diskFile); } catch (NoSuchAlgorithmException e) { LOGGER.warn("Cannot instantiate hash generator: No error correction available!"); hg = null; } this.hashGen = hg; this.machineDescription = machineDescription; this.imageBaseId = imageBaseId; } public synchronized void startHashing() throws IllegalThreadStateException { if (hashGen == null) return; hashGen.start(); } public interface GotUploadTokenCallback { public void fire(); } GotUploadTokenCallback gotTokenCallback = null; public synchronized void startUpload(GotUploadTokenCallback gotTokenCallback) { if (initState != UploadInitState.IDLE) throw new IllegalStateException("Upload already started"); this.gotTokenCallback = gotTokenCallback; initState = UploadInitState.REQUESTING; QuickTimer.scheduleAtFixedRate(startUploadInternal, 1, TimeUnit.SECONDS.toMillis(15)); } private QuickTimer.Task startUploadInternal = new QuickTimer.Task() { @Override public void fire() { LOGGER.debug("start upload internal"); try { transferInformation = ThriftManager.getSatClient().requestImageVersionUpload( Session.getSatelliteToken(), imageBaseId, fileSize, null, // TODO remove deprecated parameter machineDescription); } catch (TAuthorizationException e) { cancelWithErrorMessage("Upload vom Server verweigert"); this.cancel(); return; } catch (TTransferRejectedException e) { if (e.message != null && e.message.startsWith("Server busy")) { initState = UploadInitState.WAITING_FOR_SLOT; } else { cancelWithErrorMessage("Upload-Anfrage gescheitert (" + e.message + ")"); this.cancel(); } return; } catch (TException e) { cancelWithErrorMessage("Upload-Anfrage gescheitert!"); this.cancel(); return; } // Everything worked out so far LOGGER.info("Version upload granted, versionId: " + transferInformation.toString()); if (gotTokenCallback != null) { gotTokenCallback.fire(); } if (hashGen != null) { hashGen.setUploadToken(transferInformation.token); } initState = UploadInitState.UPLOAD_STARTING; QuickTimer.scheduleAtFixedRate(launchUploadTask, 1, 100); this.cancel(); } }; /** * Prepare the given file for uploading * * @param transferInformation transfer information to use for the upload * @param hashGen hash generator for this file * @param diskFile the file to upload * @return UploadTask if the uploading initialized, or null if uploading * failed */ private QuickTimer.Task launchUploadTask = new QuickTimer.Task() { @Override public void fire() { LOGGER.debug("launch upload task"); if (initState != UploadInitState.UPLOAD_STARTING) { this.cancel(); return; } if (uploadTask == null) { // do actually start the upload now LOGGER.debug("Starting upload for: " + diskFile.getName()); try { uploadTask = new UploadTask(Session.getSatelliteAddress(), transferInformation.getPlainPort(), transferInformation.getToken(), diskFile); } catch (FileNotFoundException e) { cancelWithErrorMessage("Kann VM nicht hochladen: Datei nicht gefunden\n\n" + diskFile.getAbsolutePath()); this.cancel(); return; } uploadTask.setMinConnections(Config.getTransferConnectionCount()); Thread uploadThread = new Thread(uploadTask); uploadThread.setDaemon(true); uploadThread.start(); } if (uploadTask.getFailCount() == 0 && uploadTask.getTransferCount() == 0 && !uploadTask.isCanceled()) { // Still initializing, wait... return; } if (!uploadTask.isComplete() && uploadTask.getTransferCount() == 0) { // Init failed cancelWithErrorMessage("Aufbau der Verbindung zum Hochladen fehlgeschlagen"); this.cancel(); return; } // Init succeeded initState = UploadInitState.UPLOAD_STARTED; this.cancel(); } }; /** * Set state to cancelled and clean up */ public synchronized void cancelError() { if (initState == UploadInitState.ERROR) return; initState = UploadInitState.ERROR; if (hashGen != null) { hashGen.cancel(); } if (uploadTask != null) { uploadTask.cancel(); } if (transferInformation != null) { try { ThriftManager.getSatClient().cancelUpload(transferInformation.token); } catch (Exception e) { } } } public UploadInitState getState() { return initState; } public String getErrorMessage() { return errorMessage; } public void cancelWithErrorMessage(String errorMessage) { if (!errorMessage.equals(this.errorMessage)) { LOGGER.warn(errorMessage); } this.errorMessage = errorMessage; cancelError(); } public AsyncHashGenerator getHasher() { return hashGen; } public UploadTask getUploadTask() { return uploadTask; } public String getToken() { return transferInformation == null ? null : transferInformation.token; } }