diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Downloader.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/Downloader.java | 78 |
1 files changed, 76 insertions, 2 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java index cb933af..50162fc 100644 --- a/src/main/java/org/openslx/filetransfer/Downloader.java +++ b/src/main/java/org/openslx/filetransfer/Downloader.java @@ -1,20 +1,28 @@ package org.openslx.filetransfer; +import java.io.DataInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.RandomAccessFile; import java.net.Socket; import javax.net.ssl.SSLContext; +import net.jpountz.lz4.LZ4FastDecompressor; + import org.apache.log4j.Logger; public class Downloader extends Transfer { private static final Logger log = Logger.getLogger( Downloader.class ); - + + private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor(); + + private final Lz4InStream compressedIn; + /***********************************************************************/ /** * Actively initiate a connection to a remote peer for downloading. @@ -26,6 +34,7 @@ public class Downloader extends Transfer public Downloader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException { super( host, port, readTimeoutMs, context, log ); + compressedIn = new Lz4InStream( dataFromServer ); outStream.writeByte( 'D' ); if ( !sendToken( token ) || !sendEndOfMeta() ) throw new IOException( "Sending token failed" ); @@ -41,6 +50,7 @@ public class Downloader extends Transfer protected Downloader( Socket socket ) throws IOException { super( socket, log ); + compressedIn = new Lz4InStream( dataFromServer ); } /** @@ -81,6 +91,62 @@ public class Downloader extends Transfer Transfer.safeClose( file ); } } + + private class Lz4InStream extends InputStream + { + private final DataInputStream parentStream; + + private long compressed, uncompressed; + + private byte[] buffer; + + public Lz4InStream( DataInputStream in ) + { + parentStream = in; + log.info( "DeCompressor: " + decompressor.getClass().getSimpleName() ); + } + + @Override + public int read( byte b[], int off, int len ) throws IOException + { + try { + int decompressedLength = parentStream.readInt(); + int compressedLength = parentStream.readInt(); + compressed += compressedLength; + uncompressed += decompressedLength; + if ( decompressedLength > len ) { + throw new RuntimeException( "This should never happen! ;)" ); + } + if ( decompressedLength == compressedLength ) { + parentStream.readFully( b, off, decompressedLength ); + } else { + // Compressed + if ( buffer == null || buffer.length < compressedLength ) { + buffer = new byte[ compressedLength ]; + } + parentStream.readFully( buffer, 0, compressedLength ); + decompressor.decompress( buffer, 0, b, off, decompressedLength ); + } + return decompressedLength; + } catch ( Throwable e ) { + throw new IOException( e ); + } + } + + @Override + public int read() throws IOException + { + throw new UnsupportedOperationException( "Cant do this!" ); + } + + public void printStats() + { + if ( compressed == 0 ) + return; + log.info( "Received bytes: " + compressed + ", decompressed bytes: " + uncompressed ); + } + + } /** * Initiate the download. This method does not return until the file transfer finished. @@ -92,6 +158,7 @@ public class Downloader extends Transfer * more parts are needed, which in turn let's this method return true * @return true on success, false otherwise */ + @SuppressWarnings( "resource" ) public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback ) { if ( shouldGetToken() ) { @@ -105,6 +172,10 @@ public class Downloader extends Transfer log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" ); return false; } + if ( useCompression ) { + // Request compressed transfer + sendUseCompression(); + } // Send range request if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) { log.error( "Could not send next range request, download failed." ); @@ -123,12 +194,14 @@ public class Downloader extends Transfer } // Receive requested range int chunkLength = requestedRange.getLength(); + // If the uploader sets the COMPRESS field, assume compressed chunk + InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer; byte[] incoming = new byte[ 500000 ]; // 500kb int hasRead = 0; while ( hasRead < chunkLength ) { int ret; try { - ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) ); + ret = inStream.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) ); if ( Thread.currentThread().isInterrupted() ) { log.debug( "Thread interrupted in download loop" ); return false; @@ -151,6 +224,7 @@ public class Downloader extends Transfer } sendDone(); sendEndOfMeta(); + compressedIn.printStats(); try { transferSocket.shutdownOutput(); } catch ( IOException e ) { |