summaryrefslogblamecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java
blob: 29f744597903209b9c25ddd69e2a9b5effeeef82 (plain) (tree)
1
2
3
4
5
6
7
8
9


                                        
                                     
                                
                      
 
                               
                                                   
                                 


                                                     
                                         
                                                  


                                               


                                                                        
   
                                                





                                                                                  


                                           

                                                  
                                     
                                            

                                                                                                             
                                                                             
                                                 


                                                   

                                                                              
                                                            

         





                                                                                          

                                            


                                        

                                                              





                                                                   


                                                                     
                                             



                                                                                                             




                                                                                                                      
                                                   

                                                                    






                                                                                               

                                                 
                                            

                 




                                                    
 
         
 



                                                                           
                     



                                                                                     
                                                          











                                                                                                        
                             


                                                                                                                      
                                                                       
                                                    
                                       








                                                                       
                         

                                                 
 



                                                  
 


                                                    

                 














                                                                          

                                                                            

                                                       





                                                      
                                      



                                                                  
                 
                                                                                                




                                                  
                                 


                                                                                
                                                                                                                                        
                 
                                                                                                     
         
 


                                                    
         
 
package org.openslx.dozmod.filetransfer;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.List;

import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.dozmod.Config;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.Transfer;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.ChunkList;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.util.Util;

/**
 * Execute file download in a background thread and update the progress.
 */
public class DownloadTask extends TransferTask {

	/**
	 * Logger instance for this class.
	 */
	private final static Logger LOGGER = Logger.getLogger(DownloadTask.class);

	private final String host;
	private final int port;
	private final String downloadToken;
	private final RandomAccessFile fileHandle;
	private final ChunkList chunks;
	private final long startTime;
	private boolean fileWritable = true;

	public DownloadTask(String host, int port, String downloadToken, File destinationFile, long fileSize,
			List<byte[]> sha1Sums) throws FileNotFoundException {
		super(destinationFile, fileSize);
		this.host = host;
		this.port = port;
		this.downloadToken = downloadToken;
		this.fileHandle = new RandomAccessFile(destinationFile, "rw");
		this.chunks = new ChunkList(fileSize, sha1Sums);
		this.startTime = System.currentTimeMillis();
	}

	private class DownloadHandler implements WantRangeCallback, DataReceivedCallback {
		private FileChunk current = null;
		private byte[] buffer = null;
		// progress counter
		private long currentSpeed = 0;
		private long currentBytes = 0;
		private long lastUpdate = 0;
		private long lastBytes = 0;

		@Override
		public FileRange get() {
			handleCompletedChunk(current, buffer);
			consecutiveInitFails.lazySet(0);
			try {
				current = chunks.getMissing();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				return null;
			}
			if (current == null)
				return null;
			buffer = new byte[current.range.getLength()];
			return current.range;
		}

		@Override
		public boolean dataReceived(final long fileOffset, final int dataLength, final byte[] data) {
			if (current == null)
				throw new IllegalStateException("dataReceived without current chunk");
			if (!current.range.contains(fileOffset, fileOffset + dataLength))
				throw new IllegalStateException("dataReceived with file data out of range");
			System.arraycopy(data, 0, buffer, (int) (fileOffset - current.range.startOffset), dataLength);
			currentBytes += dataLength;
			final long now = System.currentTimeMillis();
			if (lastUpdate + UPDATE_INTERVAL_MS < now) {
				synchronized (this) {
					// Calculate updated speed
					lastBytes = (lastBytes * 2 + currentBytes) / 3;
					currentSpeed = (1000 * lastBytes) / (now - lastUpdate);
					lastUpdate = now;
				}
				// Reset counters
				currentBytes = 0;
			}
			return fileWritable;
		}

		private long getCurrentSpeed() {
			synchronized (this) {
				return currentSpeed;
			}
		}

	}

	private void handleCompletedChunk(FileChunk chunk, byte[] buffer) {
		if (chunk == null)
			return;
		// TODO: Hash check, async
		try {
			synchronized (fileHandle) {
				fileHandle.seek(chunk.range.startOffset);
				fileHandle.write(buffer, 0, chunk.range.getLength());
			}
			chunks.markCompleted(chunk, true);
		} catch (Exception e) {
			LOGGER.error("Could not write to file at offset " + chunk.range.startOffset, e);
			fileWritable = false;
		}
	}

	private class DownloadThread extends TransferThread {
		private Downloader downloader = null;
		private DownloadHandler cb = new DownloadHandler();

		@Override
		public void run() {
			try {
				downloader = new Downloader(host, port, Config.TRANSFER_TIMEOUT, null, downloadToken);
			} catch (Exception e) {
				LOGGER.warn("Could not initialize new uploader", e);
				consecutiveInitFails.incrementAndGet();
				connectFailed(this);
				return;
			} // TODO: SSL
			connectSucceeded(this);

			boolean ret = downloader.download(cb, cb);
			if (!ret) {
				consecutiveInitFails.incrementAndGet();
			}
			if (cb.current != null) {
				chunks.markFailed(cb.current);
			}
			transferEnded(this, ret);
		}

		@Override
		protected Transfer getTransfer() {
			return downloader;
		}

		@Override
		public long getCurrentSpeed() {
			return cb.getCurrentSpeed();
		}

	}

	@Override
	protected void cleanup() {
		Util.safeClose(fileHandle);
	}

	@Override
	protected TransferEvent getTransferEvent() {
		final TransferState state;
		final byte[] progress = chunks.getStatusArray().array();
		final String error;
		if (consecutiveInitFails.get() > 20) {
			state = TransferState.ERROR;
			error = "Cannot talk to server after 20 tries...";
		} else if (chunks.isComplete() && getTransferCount() == 0) {
			Util.safeClose(fileHandle);
			state = TransferState.FINISHED;
			error = null;
		} else {
			state = TransferState.WORKING;
			error = null;
		}
		long speed = 0;
		long timeRemaining = 0;
		long virtualSpeed = 0;
		synchronized (transfers) {
			for (TransferThread thread : transfers) {
				speed += thread.getCurrentSpeed();
			}
		}
		// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying
		if (progress != null) {
			int missing = 0;
			for (byte b : progress) {
				if (b != 0) {
					missing++;
				}
			}
			final long bytesRemaining = CHUNK_SIZE * (long) missing;
			timeRemaining = (1000 * bytesRemaining) / (speed + 1);
			virtualSpeed = ((progress.length - missing) * CHUNK_SIZE * 1000) / (System.currentTimeMillis() - startTime + 1);
		}
		return new TransferEvent(state, progress, speed, virtualSpeed, timeRemaining, error);
	}

	@Override
	protected TransferThread createNewThread() {
		return new DownloadThread();
	}
}