summaryrefslogblamecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java
blob: cca429298ce87c8afd15335a560452ff5b869511 (plain) (tree)
1
2
3
4
5
6
7
8
9
10





                                        
                                                 
 

                                           
                                                   
                                         


                                         
                                                                              
 
                                                                                      
 











                                                                                               

                                                                                 




                                                                                
                                                    


                                                     
                                               
 

                                       
 









                                                                 

                                                                            





                                                                                                                  

                                              



                                                       
                                                                 




                                                                  
                 



                                                            
                         


                                           
                          
                                                      



                                                

         

                                     
         
 
                 


                                     

                                                     


                                                                                                                             
                                          

                                                                         




                                                         
                                                                                      





                                                            





                                                                                                         




                                                        
                 





                                                                 
                 







                                                                    
                                                   
                                          








                                                            

                                                         




                                             
                                                      
                                       







                                                                                                

                                                                                                
 









                                                                        








                                                                   

                                                                                                                 





                                                                      

                                                      

                                                                                                                            

                                       
                 

                                              




                                                                                    

                                                                                                                                  



                                                                                                                                
                                                                      

                                                         
                                                                     

                                               
                                                             
                                                                                                 

                                                                                                                 
                         
                 
                                 








                                                                                    

                                                  

         





                                                



                                 









                                                                       
package org.openslx.dozmod.filetransfer;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.Transfer;
import org.openslx.util.Util;

public abstract class TransferTask implements Runnable, TransferEventEmitter {

	private static final Logger LOGGER = LogManager.getLogger(TransferTask.class);

	protected static final double BYTES_PER_MIB = 1024 * 1024;
	protected static final long CHUNK_SIZE = 16 * 1024 * 1024;

	/**
	 * Update interval of transfer status (speed only)
	 */
	protected static final double UPDATE_INTERVAL_SECONDS = 0.6;
	protected static final int UPDATE_INTERVAL_MS = (int) (UPDATE_INTERVAL_SECONDS * 1000);

	protected final List<TransferThread> transfers = new ArrayList<>();
	private final List<TransferThread> connectingTransfers = new ArrayList<>();

	protected final AtomicInteger consecutiveInitFails = new AtomicInteger();

	/**
	 * List of listeners that want to get status updates about the transfer.
	 */
	private final List<TransferEventListener> listeners = new ArrayList<>();

	private volatile boolean isComplete = false;
	private volatile boolean isCancelled = false;
	private boolean endgame = false;
	private int minConnectionCount = 1;
	private long lastConnectionAttempt = 0;

	protected final File localFile;
	protected final long fileSize;

	protected TransferTask(File localFile, long fileSize) {
		this.localFile = localFile;
		this.fileSize = fileSize;
	}

	@Override
	public final void run() {
		while (!isCancelled && !Thread.interrupted()) {
			TransferEvent event = getTransferEvent();
			if (event != null) {
				if (event.state == TransferState.FINISHED) {
					isComplete = true;
				}
				if (event.state == TransferState.ERROR) {
					isCancelled = true;
				}
				fireEvent(event);
				if (event.state == TransferState.ERROR || event.state == TransferState.FINISHED) {
					break;
				}
			}
			ensureActivity();
			Util.sleep(UPDATE_INTERVAL_MS);
		}
		LOGGER.info("Transfer worker mainloop finished");
		List<TransferThread> joinList = new ArrayList<>();
		synchronized (transfers) {
			isCancelled = true;
			joinList.addAll(transfers);
			joinList.addAll(connectingTransfers);
		}
		for (TransferThread t : joinList) {
			Transfer transfer = t.getTransfer();
			if (transfer != null) {
				transfer.cancel();
			}
			t.interrupt();
			Util.joinThread(t);
		}
		cleanup();
		LOGGER.info("Trasfer worker exiting");
	}

	protected void cleanup() {
		// By default, this does nothing
	}

	public boolean isComplete() {
		return isComplete;
	}

	@Override
	public boolean isCanceled() {
		return isCancelled;
	}

	private void fireEvent(TransferEvent event) {
		if (event.errorMessage != null) {
			LOGGER.warn("(" + this.getClass().getSimpleName() + ") fireEvent with error: " + event.errorMessage);
		}
		synchronized (listeners) {
			for (int i = listeners.size() - 1; i >= 0; --i) {
				listeners.get(i).update(event);
			}
		}
	}

	protected void fireErrorMessage(String message) {
		TransferEvent event = new TransferEvent(null, null, 0, 0, 0, message);
		fireEvent(event);
	}

	protected abstract TransferEvent getTransferEvent();

	public final void setMinConnections(int count) {
		if (Session.getSatelliteConfig() != null
				&& Session.getSatelliteConfig().isSetMaxConnectionsPerTransfer()
				&& Session.getSatelliteConfig().getMaxConnectionsPerTransfer() > 0
				&& Session.getSatelliteConfig().getMaxConnectionsPerTransfer() < count) {
			count = Session.getSatelliteConfig().getMaxConnectionsPerTransfer();
		}
		synchronized (transfers) {
			this.minConnectionCount = count;
		}
	}

	@Override
	public void addListener(TransferEventListener listener) {
		synchronized (listeners) {
			listeners.add(listener);
		}
	}

	@Override
	public void removeListener(TransferEventListener listener) {
		synchronized (listeners) {
			while (listeners.remove(listener)) {
			}
		}
	}

	public void cancel() {
		final List<TransferThread> joiners;
		synchronized (transfers) {
			if (isCancelled)
				return;
			isCancelled = true;
			joiners = new ArrayList<>();
			joiners.addAll(transfers);
			joiners.addAll(connectingTransfers);
		}
		for (TransferThread t : joiners) {
			if (t.getTransfer() != null) {
				t.getTransfer().cancel();
			}
		}
	}

	private final void ensureActivity() {
		synchronized (transfers) {
			if (isCancelled || isComplete)
				return;
			if (endgame && (!transfers.isEmpty() || !connectingTransfers.isEmpty()))
				return;
			Iterator<TransferThread> it = transfers.iterator();
			while (it.hasNext()) {
				if (!it.next().getTransfer().isValid()) {
					it.remove();
				}
			}
			if (transfers.size() + connectingTransfers.size() >= minConnectionCount)
				return;

			long now = System.currentTimeMillis();
			int fails = consecutiveInitFails.get();
			if (lastConnectionAttempt + fails * 10000 > now)
				return;
			lastConnectionAttempt = now;
			TransferThread thread = createNewThread();
			if (thread != null) {
				thread.setDaemon(true);
				connectingTransfers.add(thread);
				thread.start();
			}
		}
	}

	protected abstract TransferThread createNewThread();

	protected final void connectFailed(TransferThread thread) {
		synchronized (transfers) {
			connectingTransfers.remove(thread);
			LOGGER.info("Establishing new transfer connection failed, [a:" + transfers.size() + "/c:"
					+ connectingTransfers.size() + "]");
		}
	}

	protected final void connectSucceeded(TransferThread thread) {
		synchronized (transfers) {
			connectingTransfers.remove(thread);
			if (!isCancelled) {
				transfers.add(thread);
				LOGGER.info("Establishing new transfer connection succeeded, [a:" + transfers.size() + "/c:"
						+ connectingTransfers.size() + "]");
				return;
			}
		}
		thread.getTransfer().cancel();
		thread.interrupt();
	}

	protected final void transferEnded(TransferThread thread, boolean success) {
		synchronized (transfers) {
			transfers.remove(thread);
			LOGGER.info("A transfer connection has finished (success=" + success + "), [a:" + transfers.size() + "/c:"
					+ connectingTransfers.size() + "]");
			if (endgame && !success && transfers.isEmpty()) {
				// We had a transfer that reported success before, so we assume there are no more pending blocks
				// not being actively transfered already. Only trigger a new upload if this was the last active
				// transfer and it failed. This also resets endgame mode.
				LOGGER.debug("Disabled endgame mode");
				endgame = false;
			} else if (!endgame && success) {
				LOGGER.debug("Enabled endgame mode");
				endgame = true;
			}
			if (success && transfers.isEmpty()) {
				LOGGER.debug("Transfer might have finished, waiting for server");
				lastConnectionAttempt = System.currentTimeMillis() + 3000; // Throttle reconnects
				return; // Skip ensureActivity check
			}
		}
		ensureActivity();
	}

	/**
	 * Get the number of consecutive connection fails. This counter is only
	 * increased if there is no active transfer running, and is reset as soon as
	 * one transfer successfully connects.
	 * 
	 * @return connect fails
	 */
	public final int getFailCount() {
		return consecutiveInitFails.get();
	}

	public int getTransferCount() {
		synchronized (transfers) {
			return transfers.size();
		}
	}

	public File getFile() {
		return localFile;
	}

	protected abstract static class TransferThread extends Thread {
		@Override
		public abstract void run();

		protected abstract Transfer getTransfer();

		public abstract long getCurrentSpeed();
	}

}