summaryrefslogblamecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
blob: 06736619908fe3a08b452faf278b642c061191af (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                        
 
                    
                                     
                                                 
 

                                

                                           
                                                            
                                                         

                                                    
                                 
                                         

                                                     
                                              
 

                                                                          
             
   
                                              
 
           
                                          
           
                                                                                    


                                                                           



                                                                           
                                                                                             
 
                                  
                                         


                                        
                                     
                                                      













                                                                            
 
                                                                                               



                                                          
                                  
                                 


                                              
                                                            

         
                                                           




                                                                       
                                                                
                                              
                                                 
 

                                   







                                                                 
















                                                                                                                     
                             
                                                                                                                     
                                               
































                                                                                                                            
                                                                       

                                                    
                         



                                                                                                                     
                                                   
                                                              

                                                            


                                                                            


                                                                                    

                                                                                  
                                                                                          



                                                                                                               
                                                                 

                                                                 
                                 


                                                                         
                                                                  

                                 
                                  
                                                               

                                                            
                                                                       
                                                                                          
                                                                                        
                                                                      
                                 

                                                                       

                                                 
 



                                                    
                         
                 


                                                  
                                        
                 
         

                                          
                                      
                                    
 

                                                  








                                                                  



                                                                                            





                                                                                                                          

                                                                                                         
                                                                                           
                                                                             

                         





                                                                  
                 







                                                                                                                     
                                 

                                                                                        
                                                                                                                                              
                         
                 

                                                                                                                                
                 

                                                                                                                 
                             

         


                                                    
         



                                                                   
 
package org.openslx.dozmod.filetransfer;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.dozmod.Config;
import org.openslx.filetransfer.Transfer;
import org.openslx.filetransfer.UploadStatusCallback;
import org.openslx.filetransfer.Uploader;
import org.openslx.thrifthelper.ThriftManager;

/**
 * Executes the file upload in a background thread and updates progress to
 * listeners.
 */
public class UploadTask extends TransferTask {

	/**
	 * Logger instance for this class.
	 */
	private final static Logger LOGGER = LogManager.getLogger(UploadTask.class);

	private static final AtomicInteger THREAD_ID = new AtomicInteger();
	
	/**
	 * Update interval of the block progress (needs thrift call to sat)
	 */
	private static final double THRIFT_INTERVAL_SECONDS = 1.7;
	private static final int THRIFT_INTERVAL_MS = (int) (THRIFT_INTERVAL_SECONDS * 1000);

	private final String host;
	private final String uploadToken;
	private final int portPlain;
	private final int portSsl;
	private final SSLContext sslCtx;
	private final long startTime;
	private String transferConnectionError = null;
	
	/**
	 * Keep track of the number of active upload connections
	 */
	private static AtomicInteger numConnections = new AtomicInteger();
	
	/**
	 * Get the number of active upload workers. This counts individual
	 * upload connections, not logical uploads which might use more than
	 * one connection at a time.
	 */
	public static int getNumberOfUploads() {
		return numConnections.get();
	}

	public UploadTask(String host, TransferInformation ti, SSLContext ctx, File uploadFile)
			throws FileNotFoundException {
		super(uploadFile, uploadFile.length());
		if (!uploadFile.canRead())
			throw new FileNotFoundException();
		this.sslCtx = ctx;
		this.host = host;
		this.portPlain = ti.plainPort;
		this.portSsl = ti.sslPort;
		this.uploadToken = ti.token;
		this.startTime = System.currentTimeMillis();
	}

	private class UploadThread extends TransferThread {

		public UploadThread() {
			super("UpConn#" + THREAD_ID.incrementAndGet());
		}

		//		private long totalBytesRead = 0;
		private long currentSpeed = 0;
		private Uploader uploader = null;

		@Override
		public void run() {
			numConnections.incrementAndGet();
			try {
				run2();
			} finally {
				numConnections.decrementAndGet();
			}
		}
		
		private Exception initPlain(Exception ex) {
			if (portPlain <= 0 || portPlain > 65535)
				return ex;
			LOGGER.info("Establishing plain upload connection to " + host + ":" + portPlain);
			try {
				uploader = new Uploader(host, portPlain, Config.TRANSFER_TIMEOUT, null, uploadToken);
			} catch (Exception e) {
				LOGGER.info("Connection failed");
				return e;
			}
			return null;
		}
		
		private Exception initSsl(Exception ex) {
			if (portSsl <= 0 || portSsl > 65535 || sslCtx == null)
				return ex;
			LOGGER.info("Establishing SSL upload connection to " + host + ":" + portSsl);
			try {
				uploader = new Uploader(host, portSsl, Config.TRANSFER_TIMEOUT, sslCtx, uploadToken);
			} catch (Exception e) {
				LOGGER.info("Connection failed");
				return e;
			}
			return null;
		}

		public void run2() {
			Exception ex = null;
			switch (Config.getFileTransferMode()) {
			case SSL:
				ex = initSsl(ex);
				if (uploader == null) {
					ex = initPlain(ex);
				}
				break;
			case SSL_ONLY:
				ex = initSsl(ex);
				break;
			case PLAIN:
			default:
				ex = initPlain(ex);
				if (uploader == null) {
					ex = initSsl(ex);
				}
				break;
			}
			if (uploader == null) {
				if (ex == null) {
					LOGGER.warn("Could not initialize new uploader because neither plain"
							+ " nor SSL transfer data is given");
				} else {
					LOGGER.warn("Could not initialize new uploader, all connection methods failed", ex);
				}
				consecutiveInitFails.incrementAndGet();
				connectFailed(this);
				return;
			}
			connectSucceeded(this);
			final UploadThread thread = this;

			final boolean ret = uploader.upload(localFile.getAbsolutePath(), new UploadStatusCallback() {
				// progress counter
				private long currentBytes = 0;
				private long lastUpdate = 0;
				private long lastBytes = 0;

				@Override
				public void uploadProgress(long bytesSent) {
					currentBytes += bytesSent;
					final long now = System.currentTimeMillis();
					if (lastUpdate + UPDATE_INTERVAL_MS < now) {
						synchronized (thread) {
							// Calculate updated speed
							// totalBytesRead += currentBytes;
							lastBytes = (lastBytes * 2 + currentBytes) / 3;
							currentSpeed = (1000 * lastBytes) / (now - lastUpdate);
							lastUpdate = now;
						}
						// Reset counters
						currentBytes = 0;
					}
				}

				@Override
				public void uploadError(String message) {
					fireErrorMessage(message);
				}
			});
			if (ret) {
				transferConnectionError = null;
				consecutiveInitFails.set(0);
			} else {
				String err = uploader.getRemoteError();
				if (err != null && !err.equals(transferConnectionError)) {
					LOGGER.warn("Upload task remote error: " + err);
					transferConnectionError = err;
				}
				consecutiveInitFails.incrementAndGet();
			}
			transferEnded(this, ret);
		}

		@Override
		public long getCurrentSpeed() {
			synchronized (this) {
				return currentSpeed;
			}
		}

		@Override
		protected Transfer getTransfer() {
			return uploader;
		}
	}

	private long lastThriftUpdate = 0;
	private long virtualSpeed = 0;
	private long nextQueryDebug;

	private AtomicInteger hashCompleteCounter;

	@Override
	protected TransferEvent getTransferEvent() {
		final long now = System.currentTimeMillis();
		TransferState state = null;
		byte[] blocks = null;
		String error = null;
		if (lastThriftUpdate + THRIFT_INTERVAL_MS < now) {
			lastThriftUpdate = now;
			try {
				if (System.currentTimeMillis() > nextQueryDebug) {
					nextQueryDebug = System.currentTimeMillis() + 30000;
					LOGGER.debug("Querying upload status...");
				}
				TransferStatus uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken);
				state = uploadStatus.getState();
				blocks = uploadStatus.getBlockStatus();
			} catch (TInvalidTokenException e) {
				error = "Upload token unknown!?";
				state = TransferState.ERROR;
				LOGGER.warn("Cannot query upload status: Token not known by the server");
			} catch (Exception e) {
				error = "Exception quering upload status: " + e.toString();
				LOGGER.warn("Cannot query upload status", e);
			}
		}
		long speed = 0;
		long timeRemaining = 0;
		synchronized (transfers) {
			for (TransferThread thread : transfers) {
				speed += thread.getCurrentSpeed();
			}
		}
		if (speed != 0) {
			// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing
			if (blocks != null) {
				int missing = 0;
				for (byte b : blocks) {
					if (b != 0) {
						missing++;
					}
				}
				final long bytesRemaining = CHUNK_SIZE * (long) missing;
				timeRemaining = (1000 * bytesRemaining) / (speed + 1);
				virtualSpeed = ((blocks.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1);
			}
		}
		if (transferConnectionError != null && (error == null || transferConnectionError.equals("Out of disk space"))) {
			error = transferConnectionError;
		}
		TransferEvent event = new TransferEvent(state, blocks, speed, virtualSpeed, timeRemaining, error,
				hashCompleteCounter.get());
		return event;
	}

	@Override
	protected TransferThread createNewThread() {
		return new UploadThread();
	}

	public void setHashCounter(AtomicInteger completeCounter) {
		this.hashCompleteCounter = completeCounter;
	}
}