summaryrefslogblamecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Downloader.java
blob: 5aff94b3e1d3a0380053dd13972ec8d858c2b3d5 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                 
                               
                    
                                     
                           
                           
                                
                       

                                
 

                                           

                                           
 
                                        
 
 
                                                                                   




                                                                                       

                                                                                 
                                                                           
           


                                                         
           
                                                                                                                          
         
                                                                 
                                                                 


                                                                        

         

                                                                                 
                                                                                  
           
                                                                                  
                              
           
                                                                
         
                                     
                                                                 

         








                                                                                                      
                                                                                                 
         
                                             
                     





                                                                                                 















                                                                                                                                          
                                                   

                 























                                                                                               
                                                                                                                          































                                                                                                              
 









                                                                                                         
                                       







                                                                                                     












                                                                                                     
                                                                                    

                                                                                                                                                            

                                                     



                                                                      










                                                                                                                                              



                                                                                                   
                                                                        





                                                                                                                                            



                                                                             

                                                                                                                   
                                                


                                                                 
                                                                                                                                       



                                                                                                           
                                                                   
                                                                                                     






                                                                                                              

                                                                                                                                  

                                                             
                                                       
                                 
                         

                                        
                                                  

                                                                
                                                 
                         
                           
                                           
                 

                            
 
 
package org.openslx.filetransfer;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.Socket;

import javax.net.ssl.SSLContext;

import net.jpountz.lz4.LZ4FastDecompressor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Downloader extends Transfer
{

	private static final Logger log = LogManager.getLogger( Downloader.class );
	
	private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor();
	
	private final Lz4InStream compressedIn;
	
	/***********************************************************************/
	/**
	 * Actively initiate a connection to a remote peer for downloading.
	 * 
	 * @param host Host name or address to connect to
	 * @param port Port to connect to
	 * @throws IOException
	 */
	public Downloader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
	{
		super( host, port, readTimeoutMs, context, log );
		compressedIn = new Lz4InStream( dataFromServer );
		outStream.writeByte( 'D' );
		if ( !sendToken( token ) || !sendEndOfMeta() )
			throw new IOException( "Sending token failed" );
	}

	/***********************************************************************/
	/**
	 * Constructor used by Listener to create an incoming download connection.
	 * 
	 * @param socket established connection to peer which requested an upload.
	 * @throws IOException
	 */
	protected Downloader( Socket socket ) throws IOException
	{
		super( socket, log );
		compressedIn = new Lz4InStream( dataFromServer );
	}

	/**
	 * Initiate the download. This method does not return until the file transfer finished.
	 * 
	 * @param destinationFile destination file name to download to
	 * @param rangeCallback this object's .get() method is called whenever the downloader needs to
	 *           know which part of the file to request next. This method should return null if no
	 *           more parts are needed, which in turn let's this method return true
	 * @return true on success, false otherwise
	 */
	public boolean download( final String destinationFile, final WantRangeCallback callback )
	{
		RandomAccessFile file = null;
		try {
			try {
				file = new RandomAccessFile( new File( destinationFile ), "rw" );
			} catch ( FileNotFoundException e2 ) {
				log.error( "Cannot open " + destinationFile + " for writing." );
				return false;
			}
			final RandomAccessFile f = file;
			DataReceivedCallback cb = new DataReceivedCallback() {
				public boolean dataReceived( final long fileOffset, final int dataLength, final byte[] data )
				{
					try {
						f.seek( fileOffset );
						f.write( data, 0, dataLength );
					} catch ( Exception e ) {
						log.error( "Could not write to file " + destinationFile + " at offset " + fileOffset, e );
						return false;
					}
					return true;
				}
			};
			return download( cb, callback );
		} finally {
			Transfer.safeClose( file );
		}
	}
	
	private class Lz4InStream extends InputStream
	{
		private final DataInputStream parentStream;
		
		private long compressed, uncompressed;
		
		private byte[] buffer;
		
		public Lz4InStream( DataInputStream in )
		{
			parentStream = in;
			log.info( "DeCompressor: " + decompressor.getClass().getSimpleName() );
		}

		@Override
		public int read( byte b[], int off, int len ) throws IOException
		{
			try {
				int decompressedLength = parentStream.readInt();
				int compressedLength = parentStream.readInt();
				compressed += compressedLength;
				uncompressed += decompressedLength;
				if ( decompressedLength > len ) {
					// TODO: Partial reads with buffering, if remote payload is larger than our buffer
					throw new RuntimeException( "This should never happen! ;)" );
				}
				if ( decompressedLength == compressedLength ) {
					parentStream.readFully( b, off, decompressedLength );
				} else {
					// Compressed
					if ( buffer == null || buffer.length < compressedLength ) {
						buffer = new byte[ compressedLength ];
					}
					parentStream.readFully( buffer, 0, compressedLength );
					decompressor.decompress( buffer, 0, b, off, decompressedLength );
				}
				return decompressedLength;
			} catch ( Throwable e ) {
				throw new IOException( e );
			}
		}

		@Override
		public int read() throws IOException
		{
			throw new UnsupportedOperationException( "Cant do this!" );
		}
		
		public void printStats()
		{
			if ( compressed == 0 )
				return;
			log.info( "Received bytes: " + compressed + ", decompressed bytes: " + uncompressed );
		}

	}

	/**
	 * Initiate the download. This method does not return until the file transfer finished.
	 * 
	 * @param dataCallback this object's .dataReceived() method is called whenever a chunk of data is
	 *           received
	 * @param rangeCallback this object's .get() method is called whenever the downloader needs to
	 *           know which part of the file to request next. This method should return null if no
	 *           more parts are needed, which in turn let's this method return true
	 * @return true on success, false otherwise
	 */
	@SuppressWarnings( "resource" )
	public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback )
	{
		if ( shouldGetToken() ) {
			log.error( "You didn't call getToken yet!" );
			return false;
		}
		FileRange requestedRange;
		try {
			byte[] incoming = new byte[ 500000 ];
			/* TODO once the Lz4InputStream can handle small buffer sizes / partial reads
			for ( int bufsiz = 600; bufsiz >= 100 && incoming == null; bufsiz -= 100 ) {
				try {
					incoming = new byte[ bufsiz * 1024 ];
				} catch ( OutOfMemoryError e ) {
				}
			}
			if ( incoming == null ) {
				log.error( "Could not allocate buffer for receiving." );
				return false;
			}
			*/
			while ( ( requestedRange = rangeCallback.get() ) != null ) {
				if ( requestedRange.startOffset < 0 || requestedRange.startOffset >= requestedRange.endOffset ) {
					log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" );
					return false;
				}
				if ( useCompression ) {
					// Request compressed transfer
					sendUseCompression();
				}
				// Send range request
				if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) {
					log.error( "Could not send next range request, download failed." );
					return false;
				}
				// See if remote peer acknowledges range request
				MetaData meta = readMetaData();
				if ( meta == null ) {
					log.error( "Did not receive meta data from uploading remote peer after requesting range, aborting." );
					return false;
				}
				if ( getRemoteError() != null ) {
					log.error( "Remote peer sent error: " + getRemoteError() );
					return false;
				}
				FileRange remoteRange = meta.getRange();
				if ( remoteRange == null ) {
					log.error( "Remote metadata does not contain range confirmation. " + meta );
				}
				if ( !remoteRange.equals( requestedRange ) ) {
					log.error( "Confirmed range by remote peer (" + remoteRange
							+ ") does not match requested range (" + requestedRange + "), aborting download." );
					return false;
				}
				// Receive requested range
				int chunkLength = requestedRange.getLength();
				// If the uploader sets the COMPRESS field, assume compressed chunk
				InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer;
				int hasRead = 0;
				while ( hasRead < chunkLength ) {
					int ret;
					try {
						ret = inStream.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
						if ( Thread.currentThread().isInterrupted() ) {
							log.debug( "Thread interrupted in download loop" );
							return false;
						}
					} catch ( IOException e ) {
						log.error( "Could not read payload from socket", e );
						sendErrorCode( "payload read error" );
						return false;
					}
					if ( ret == -1 ) {
						log.info( "Remote peer unexpectedly closed the connection." );
						return false;
					}
					if ( !dataCallback.dataReceived( requestedRange.startOffset + hasRead, ret, incoming ) ) {
						this.close( "Aborting due to I/O error..." );
						return false;
					}
					hasRead += ret;
				}
			}
			sendDone();
			sendEndOfMeta();
			compressedIn.printStats();
			try {
				transferSocket.shutdownOutput();
			} catch ( Exception e ) {
			}
		} finally {
			this.close( null );
		}
		return true;
	}

}