summaryrefslogblamecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Uploader.java
blob: 6edc268d2c69a9d66411895670702635b54a657a (plain) (tree)
1
2
3
4
5
6
7
8
9

                                 
                                
                    
                                     
                           
                            
                                
                       

                                
 

                                     

                                           
 
                                      
 
 
                                                                                 




                                                                             

                                                                                 
                                                              
           


                                                                          
                              
           
                                                                                                                        
         
                                                                 
                                                              


                                                                        
         


                                                                                 

                                                  

                              
           
                                                           
         
                                     
                                                              
         


                                                                                 
                                                 
           
                          
           
                                                
         

                                                
        



                                                                                               






                                                            
                                                                    





















                                                                                                       
                                                                     

                                                                                                                                    
                                                                                                      
                                                                                  
                                                                   



                                                                                  
                                                                     







                                                                                    
                                                  
                                       

                                                                                                                            


                 
 

                                                                               

                                                                     

                                     
                                             
                     


                                                                                         
                                                                                                       

                                             











                                                                                                       
                                                                                                                                              


                                                                                                                 
                                                                                                                

                                                     

                                                                          



                                                                                                                    


                                                                                       
                                                                                                                 
                                                     
                                 


                                                                                         
                                                                                                                                    


                                                             
                                                                                                                             





                                                                                
                                                                                                                                                                         

                                                     

                                                                                                      
                                     



                                                                                              

                                                                                                                        
                                                                                                  


                                                             
                                                                                                                 


                                                               






                                                                                                                      
                                                                                                            


                                                             
                                                                                                                                                      



                                                             
                                                                             
                                                                   
                                                                                          

                                                             

                                                                               
                                 
                         
                           
                                                                   
                                                   

                            
         
 
 
package org.openslx.filetransfer;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;

import javax.net.ssl.SSLContext;

import net.jpountz.lz4.LZ4Compressor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Uploader extends Transfer
{

	private static final Logger log = LogManager.getLogger( Uploader.class );
	
	private final LZ4Compressor compressor = lz4factory.fastCompressor();
	
	private final Lz4OutStream compressedOut;
	
	/***********************************************************************/
	/**
	 * Actively establish upload connection to given peer.
	 * 
	 * @param host Host name or address to connect to
	 * @param port Port to connect to
	 * @param context ssl context for establishing a secure connection
	 * @throws IOException
	 */
	public Uploader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
	{
		super( host, port, readTimeoutMs, context, log );
		compressedOut = new Lz4OutStream( outStream );
		outStream.writeByte( 'U' );
		if ( !sendToken( token ) || !sendEndOfMeta() )
			throw new IOException( "Sending token failed" );
	}

	/***********************************************************************/
	/**
	 * Constructor for master uploader.
	 * Sends back the socket for datatransfer.
	 * 
	 * @throws IOException
	 */
	public Uploader( Socket socket ) throws IOException
	{
		super( socket, log );
		compressedOut = new Lz4OutStream( outStream );
	}

	/***********************************************************************/
	/**
	 * Method for sending File with filename.
	 * 
	 * @param filename
	 */
	public boolean upload( String filename )
	{
		return upload( filename, null );
	}
	
	/**
	 * Compressing output stream that will either write LZ4-compressed data, or if the data
	 * doesn't compress well, just the original uncompressed data.
	 */
	private class Lz4OutStream extends OutputStream
	{
		
		private final DataOutputStream parentStream;
		
		private byte[] buffer;
		
		private long bytesSentTotal, bytesDecompressedTotal;
		
		private int chunksCompressed, chunksUncompressed;
		
		public Lz4OutStream( DataOutputStream out )
		{
			parentStream = out;
			log.info( "Compressor: " + compressor.getClass().getSimpleName() );
		}

		@Override
		public void write( int b ) throws IOException
		{
			throw new UnsupportedOperationException( "Cannot do this" );
		}

		@Override
		public void write( byte[] data, int off, int decompressedLength ) throws IOException
		{
			int maxCompressedLength = compressor.maxCompressedLength( decompressedLength );
			if ( buffer == null || buffer.length < maxCompressedLength ) {
				buffer = new byte[ maxCompressedLength ];
			}
			bytesDecompressedTotal += decompressedLength;
			int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength );
			parentStream.writeInt( decompressedLength );
			// Only send compressed data if we got down to at least ~88% the original size
			if ( ( compressedLength * 9 / 8 ) < decompressedLength ) {
				bytesSentTotal += compressedLength;
				chunksCompressed++;
				parentStream.writeInt( compressedLength );
				parentStream.write( buffer, 0, compressedLength );
			} else {
				bytesSentTotal += decompressedLength;
				chunksUncompressed++;
				parentStream.writeInt( decompressedLength );
				parentStream.write( data, off, decompressedLength );
			}
		}
		
		public void printStats()
		{
			if ( bytesSentTotal == 0 )
				return;
			log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal );
			log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
		}

	}

	public boolean upload( String filename, UploadStatusCallback callback )
	{
		if ( shouldGetToken() ) {
			log.error( "You didn't call getToken yet!" );
			return false;
		}
		RandomAccessFile file = null;
		try {
			try {
				file = new RandomAccessFile( new File( filename ), "r" );
			} catch ( FileNotFoundException e ) {
				this.close( "Could not open given file for reading.", callback, true );
				return false;
			}
			byte[] data = null;
			// Cannot go above 500000 for backwards compat
			for ( int bufsiz = 500; bufsiz >= 100 && data == null; bufsiz -= 100 ) {
				try {
					data = new byte[ bufsiz * 1000 ];
				} catch ( OutOfMemoryError e ) {
				}
			}
			if ( data == null ) {
				this.close( "Could not allocate buffer for reading.", callback, true );
				return false;
			}
			while ( !Thread.currentThread().isInterrupted() ) { // Loop as long as remote peer is requesting chunks from this file
				// Read meta data of remote peer - either new range, or it's telling us it's done
				MetaData meta = readMetaData();
				if ( meta == null ) {
					this.close( "Did not get meta data from remote peer.", callback, true );
					return false;
				}
				if ( meta.isDone() ) // Download complete?
					break;
				if ( getRemoteError() != null ) {
					this.close( "Remote peer sent error: " + getRemoteError(), callback, true );
					return false;
				}
				// Not complete, so there must be another range request
				FileRange requestedRange = meta.getRange();
				if ( requestedRange == null ) {
					this.close( "Peer did not include RANGE in meta data.", callback, true );
					return false;
				}
				// Range inside file?
				try {
					if ( requestedRange.endOffset > file.length() ) {
						this.close( "Requested range is larger than file size, aborting.", callback, true );
						return false;
					}
				} catch ( IOException e ) {
					this.close( "Could not get current length of file " + filename, callback, false, e );
					return false;
				}
				// Seek to requested chunk
				try {
					file.seek( requestedRange.startOffset );
				} catch ( IOException e ) {
					this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true, e );
					return false;
				}
				// Send confirmation of range and compression mode we're about to send
				OutputStream outStr = outStream;
				try {
					if ( meta.peerWantsCompression() && useCompression ) {
						sendUseCompression();
						outStr = compressedOut;
					}
					long ptr = file.getFilePointer();
					if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
						this.close( "Could not send range confirmation" );
						return false;
					}
				} catch ( IOException e ) {
					this.close( "Could not determine current position in file " + filename );
					return false;
				}
				// Finally send requested chunk
				int hasRead = 0;
				int length = requestedRange.getLength();
				while ( hasRead < length ) {
					int ret;
					try {
						ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
					} catch ( IOException e ) {
						this.close( "Error reading from file ", callback, true, e );
						return false;
					}
					if ( ret == -1 ) {
						this.close( "Error occured in Uploader.sendFile() while reading from File to send.", callback, true );
						return false;
					}
					hasRead += ret;
					try {
						outStr.write( data, 0, ret );
					} catch ( IOException e ) {
						this.close( "Sending payload failed", e );
						return false;
					}
					if ( callback != null )
						callback.uploadProgress( ret );
				}
			}
		} finally {
			Transfer.safeClose( file, transferSocket );
			compressedOut.printStats();
		}
		return true;
	}

}