summaryrefslogblamecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
blob: 84b9b47a077ff7e7edbd6ada876d0c9a638fca64 (plain) (tree)
1
2
3
4
5
6
7
8
                                        
 
                    
                                     
                                                 
 

                                           


                                                            
                                 
                                         

                                                     
                                              
 

                                                                          
             
   
                                              
 
           
                                          
           
                                                                                    


                                                                           



                                                                           
                                                                                             
 


                                         
                                     
                                                      













                                                                            
 




                                                                                     
                            

                                 
                                               
                                                            

         
                                                           




                                                                       
                                                                
                                              
                                                 
 

                                   








                                                                 



                                                                                                                
                                                                       






                                                                                                                     
                                                   
                                                              

                                                            


                                                                            


                                                                                    

                                                                                  
                                                                                          



                                                                                                               
                                                                 

                                                                 
                                 


                                                                         
                                                                  

                                 
                                  
                                                               

                                                            
                                                                       
                                                                                          
                                                                                        
                                                                      
                                 

                                                                       

                                                 
 



                                                    
                         
                 


                                                  
                                        
                 
         

                                          
                                      
                                    









                                                                  



                                                                                            





                                                                                                                          

                                                                                                         
                                                                                           
                                                                             

                         





                                                                  
                 







                                                                                                                     
                                 

                                                                                        
                                                                                                                                              
                         
                 

                                                                                                                                
                 
                                                                                                                  
                             

         


                                                    
         
 
package org.openslx.dozmod.filetransfer;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.dozmod.Config;
import org.openslx.filetransfer.Transfer;
import org.openslx.filetransfer.UploadStatusCallback;
import org.openslx.filetransfer.Uploader;
import org.openslx.thrifthelper.ThriftManager;

/**
 * Executes the file upload in a background thread and updates progress to
 * listeners.
 */
public class UploadTask extends TransferTask {

	/**
	 * Logger instance for this class.
	 */
	private final static Logger LOGGER = LogManager.getLogger(UploadTask.class);

	private static final AtomicInteger THREAD_ID = new AtomicInteger();
	
	/**
	 * Update interval of the block progress (needs thrift call to sat)
	 */
	private static final double THRIFT_INTERVAL_SECONDS = 1.7;
	private static final int THRIFT_INTERVAL_MS = (int) (THRIFT_INTERVAL_SECONDS * 1000);

	private final String host;
	private final int port;
	private final String uploadToken;
	private final long startTime;
	private String transferConnectionError = null;
	
	/**
	 * Keep track of the number of active upload connections
	 */
	private static AtomicInteger numConnections = new AtomicInteger();
	
	/**
	 * Get the number of active upload workers. This counts individual
	 * upload connections, not logical uploads which might use more than
	 * one connection at a time.
	 */
	public static int getNumberOfUploads() {
		return numConnections.get();
	}

	public UploadTask(String host, int port, String uploadToken, File uploadFile)
			throws FileNotFoundException {
		super(uploadFile, uploadFile.length());
		if (!uploadFile.canRead())
			throw new FileNotFoundException();
		// TODO: SSL
		this.host = host;
		this.port = port;
		this.uploadToken = uploadToken;
		this.startTime = System.currentTimeMillis();
	}

	private class UploadThread extends TransferThread {

		public UploadThread() {
			super("UpConn#" + THREAD_ID.incrementAndGet());
		}

		//		private long totalBytesRead = 0;
		private long currentSpeed = 0;
		private Uploader uploader = null;

		@Override
		public void run() {
			numConnections.incrementAndGet();
			try {
				run2();
			} finally {
				numConnections.decrementAndGet();
			}
		}
		
		public void run2() {
			try {
				uploader = new Uploader(host, port, Config.TRANSFER_TIMEOUT, null, uploadToken);
			} catch (Exception e) {
				LOGGER.warn("Could not initialize new uploader", e);
				consecutiveInitFails.incrementAndGet();
				connectFailed(this);
				return;
			} // TODO: SSL
			connectSucceeded(this);
			final UploadThread thread = this;

			final boolean ret = uploader.upload(localFile.getAbsolutePath(), new UploadStatusCallback() {
				// progress counter
				private long currentBytes = 0;
				private long lastUpdate = 0;
				private long lastBytes = 0;

				@Override
				public void uploadProgress(long bytesSent) {
					currentBytes += bytesSent;
					final long now = System.currentTimeMillis();
					if (lastUpdate + UPDATE_INTERVAL_MS < now) {
						synchronized (thread) {
							// Calculate updated speed
							// totalBytesRead += currentBytes;
							lastBytes = (lastBytes * 2 + currentBytes) / 3;
							currentSpeed = (1000 * lastBytes) / (now - lastUpdate);
							lastUpdate = now;
						}
						// Reset counters
						currentBytes = 0;
					}
				}

				@Override
				public void uploadError(String message) {
					fireErrorMessage(message);
				}
			});
			if (ret) {
				transferConnectionError = null;
				consecutiveInitFails.set(0);
			} else {
				String err = uploader.getRemoteError();
				if (err != null && !err.equals(transferConnectionError)) {
					LOGGER.warn("Upload task remote error: " + err);
					transferConnectionError = err;
				}
				consecutiveInitFails.incrementAndGet();
			}
			transferEnded(this, ret);
		}

		@Override
		public long getCurrentSpeed() {
			synchronized (this) {
				return currentSpeed;
			}
		}

		@Override
		protected Transfer getTransfer() {
			return uploader;
		}
	}

	private long lastThriftUpdate = 0;
	private long virtualSpeed = 0;
	private long nextQueryDebug;

	@Override
	protected TransferEvent getTransferEvent() {
		final long now = System.currentTimeMillis();
		TransferState state = null;
		byte[] blocks = null;
		String error = null;
		if (lastThriftUpdate + THRIFT_INTERVAL_MS < now) {
			lastThriftUpdate = now;
			try {
				if (System.currentTimeMillis() > nextQueryDebug) {
					nextQueryDebug = System.currentTimeMillis() + 30000;
					LOGGER.debug("Querying upload status...");
				}
				TransferStatus uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken);
				state = uploadStatus.getState();
				blocks = uploadStatus.getBlockStatus();
			} catch (TInvalidTokenException e) {
				error = "Upload token unknown!?";
				state = TransferState.ERROR;
				LOGGER.warn("Cannot query upload status: Token not known by the server");
			} catch (Exception e) {
				error = "Exception quering upload status: " + e.toString();
				LOGGER.warn("Cannot query upload status", e);
			}
		}
		long speed = 0;
		long timeRemaining = 0;
		synchronized (transfers) {
			for (TransferThread thread : transfers) {
				speed += thread.getCurrentSpeed();
			}
		}
		if (speed != 0) {
			// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing
			if (blocks != null) {
				int missing = 0;
				for (byte b : blocks) {
					if (b != 0) {
						missing++;
					}
				}
				final long bytesRemaining = CHUNK_SIZE * (long) missing;
				timeRemaining = (1000 * bytesRemaining) / (speed + 1);
				virtualSpeed = ((blocks.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1);
			}
		}
		if (transferConnectionError != null && (error == null || transferConnectionError.equals("Out of disk space"))) {
			error = transferConnectionError;
		}
		TransferEvent event = new TransferEvent(state, blocks, speed, virtualSpeed, timeRemaining, error);
		return event;
	}

	@Override
	protected TransferThread createNewThread() {
		return new UploadThread();
	}
}