package org.openslx.filetransfer; import java.io.DataInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.Socket; import javax.net.ssl.SSLContext; import net.jpountz.lz4.LZ4FastDecompressor; import org.apache.log4j.Logger; public class Downloader extends Transfer { private static final Logger log = Logger.getLogger( Downloader.class ); private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor(); private final Lz4InStream compressedIn; /***********************************************************************/ /** * Actively initiate a connection to a remote peer for downloading. * * @param host Host name or address to connect to * @param port Port to connect to * @throws IOException */ public Downloader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException { super( host, port, readTimeoutMs, context, log ); compressedIn = new Lz4InStream( dataFromServer ); outStream.writeByte( 'D' ); if ( !sendToken( token ) || !sendEndOfMeta() ) throw new IOException( "Sending token failed" ); } /***********************************************************************/ /** * Constructor used by Listener to create an incoming download connection. * * @param socket established connection to peer which requested an upload. * @throws IOException */ protected Downloader( Socket socket ) throws IOException { super( socket, log ); compressedIn = new Lz4InStream( dataFromServer ); } /** * Initiate the download. This method does not return until the file transfer finished. * * @param destinationFile destination file name to download to * @param rangeCallback this object's .get() method is called whenever the downloader needs to * know which part of the file to request next. This method should return null if no * more parts are needed, which in turn let's this method return true * @return true on success, false otherwise */ public boolean download( final String destinationFile, final WantRangeCallback callback ) { RandomAccessFile file = null; try { try { file = new RandomAccessFile( new File( destinationFile ), "rw" ); } catch ( FileNotFoundException e2 ) { log.error( "Cannot open " + destinationFile + " for writing." ); return false; } final RandomAccessFile f = file; DataReceivedCallback cb = new DataReceivedCallback() { public boolean dataReceived( final long fileOffset, final int dataLength, final byte[] data ) { try { f.seek( fileOffset ); f.write( data, 0, dataLength ); } catch ( Exception e ) { log.error( "Could not write to file " + destinationFile + " at offset " + fileOffset, e ); return false; } return true; } }; return download( cb, callback ); } finally { Transfer.safeClose( file ); } } private class Lz4InStream extends InputStream { private final DataInputStream parentStream; private long compressed, uncompressed; private byte[] buffer; public Lz4InStream( DataInputStream in ) { parentStream = in; log.info( "DeCompressor: " + decompressor.getClass().getSimpleName() ); } @Override public int read( byte b[], int off, int len ) throws IOException { try { int decompressedLength = parentStream.readInt(); int compressedLength = parentStream.readInt(); compressed += compressedLength; uncompressed += decompressedLength; if ( decompressedLength > len ) { throw new RuntimeException( "This should never happen! ;)" ); } if ( decompressedLength == compressedLength ) { parentStream.readFully( b, off, decompressedLength ); } else { // Compressed if ( buffer == null || buffer.length < compressedLength ) { buffer = new byte[ compressedLength ]; } parentStream.readFully( buffer, 0, compressedLength ); decompressor.decompress( buffer, 0, b, off, decompressedLength ); } return decompressedLength; } catch ( Throwable e ) { throw new IOException( e ); } } @Override public int read() throws IOException { throw new UnsupportedOperationException( "Cant do this!" ); } public void printStats() { if ( compressed == 0 ) return; log.info( "Received bytes: " + compressed + ", decompressed bytes: " + uncompressed ); } } /** * Initiate the download. This method does not return until the file transfer finished. * * @param dataCallback this object's .dataReceived() method is called whenever a chunk of data is * received * @param rangeCallback this object's .get() method is called whenever the downloader needs to * know which part of the file to request next. This method should return null if no * more parts are needed, which in turn let's this method return true * @return true on success, false otherwise */ @SuppressWarnings( "resource" ) public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback ) { if ( shouldGetToken() ) { log.error( "You didn't call getToken yet!" ); return false; } FileRange requestedRange; try { while ( ( requestedRange = rangeCallback.get() ) != null ) { if ( requestedRange.startOffset < 0 || requestedRange.startOffset >= requestedRange.endOffset ) { log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" ); return false; } if ( useCompression ) { // Request compressed transfer sendUseCompression(); } // Send range request if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) { log.error( "Could not send next range request, download failed." ); return false; } // See if remote peer acknowledges range request MetaData meta = readMetaData(); if ( meta == null ) { log.error( "Did not receive meta data from uploading remote peer after requesting range, aborting." ); return false; } FileRange remoteRange = meta.getRange(); if ( remoteRange == null || !remoteRange.equals( requestedRange ) ) { log.error( "Confirmed range by remote peer does not match requested range, aborting download." ); return false; } // Receive requested range int chunkLength = requestedRange.getLength(); // If the uploader sets the COMPRESS field, assume compressed chunk InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer; byte[] incoming = new byte[ 500000 ]; // 500kb int hasRead = 0; while ( hasRead < chunkLength ) { int ret; try { ret = inStream.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) ); if ( Thread.currentThread().isInterrupted() ) { log.debug( "Thread interrupted in download loop" ); return false; } } catch ( IOException e ) { log.error( "Could not read payload from socket" ); sendErrorCode( "payload read error" ); return false; } if ( ret == -1 ) { log.info( "Remote peer unexpectedly closed the connection." ); return false; } if ( !dataCallback.dataReceived( requestedRange.startOffset + hasRead, ret, incoming ) ) { this.close( "Aborting due to I/O error..." ); return false; } hasRead += ret; } } sendDone(); sendEndOfMeta(); compressedIn.printStats(); try { transferSocket.shutdownOutput(); } catch ( IOException e ) { } } finally { this.close( null ); } return true; } }