summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Downloader.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Downloader.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/Downloader.java78
1 files changed, 76 insertions, 2 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java
index cb933af..50162fc 100644
--- a/src/main/java/org/openslx/filetransfer/Downloader.java
+++ b/src/main/java/org/openslx/filetransfer/Downloader.java
@@ -1,20 +1,28 @@
package org.openslx.filetransfer;
+import java.io.DataInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
import javax.net.ssl.SSLContext;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
import org.apache.log4j.Logger;
public class Downloader extends Transfer
{
private static final Logger log = Logger.getLogger( Downloader.class );
-
+
+ private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor();
+
+ private final Lz4InStream compressedIn;
+
/***********************************************************************/
/**
* Actively initiate a connection to a remote peer for downloading.
@@ -26,6 +34,7 @@ public class Downloader extends Transfer
public Downloader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
{
super( host, port, readTimeoutMs, context, log );
+ compressedIn = new Lz4InStream( dataFromServer );
outStream.writeByte( 'D' );
if ( !sendToken( token ) || !sendEndOfMeta() )
throw new IOException( "Sending token failed" );
@@ -41,6 +50,7 @@ public class Downloader extends Transfer
protected Downloader( Socket socket ) throws IOException
{
super( socket, log );
+ compressedIn = new Lz4InStream( dataFromServer );
}
/**
@@ -81,6 +91,62 @@ public class Downloader extends Transfer
Transfer.safeClose( file );
}
}
+
+ private class Lz4InStream extends InputStream
+ {
+ private final DataInputStream parentStream;
+
+ private long compressed, uncompressed;
+
+ private byte[] buffer;
+
+ public Lz4InStream( DataInputStream in )
+ {
+ parentStream = in;
+ log.info( "DeCompressor: " + decompressor.getClass().getSimpleName() );
+ }
+
+ @Override
+ public int read( byte b[], int off, int len ) throws IOException
+ {
+ try {
+ int decompressedLength = parentStream.readInt();
+ int compressedLength = parentStream.readInt();
+ compressed += compressedLength;
+ uncompressed += decompressedLength;
+ if ( decompressedLength > len ) {
+ throw new RuntimeException( "This should never happen! ;)" );
+ }
+ if ( decompressedLength == compressedLength ) {
+ parentStream.readFully( b, off, decompressedLength );
+ } else {
+ // Compressed
+ if ( buffer == null || buffer.length < compressedLength ) {
+ buffer = new byte[ compressedLength ];
+ }
+ parentStream.readFully( buffer, 0, compressedLength );
+ decompressor.decompress( buffer, 0, b, off, decompressedLength );
+ }
+ return decompressedLength;
+ } catch ( Throwable e ) {
+ throw new IOException( e );
+ }
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ throw new UnsupportedOperationException( "Cant do this!" );
+ }
+
+ public void printStats()
+ {
+ if ( compressed == 0 )
+ return;
+ log.info( "Received bytes: " + compressed + ", decompressed bytes: " + uncompressed );
+ }
+
+ }
/**
* Initiate the download. This method does not return until the file transfer finished.
@@ -92,6 +158,7 @@ public class Downloader extends Transfer
* more parts are needed, which in turn let's this method return true
* @return true on success, false otherwise
*/
+ @SuppressWarnings( "resource" )
public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback )
{
if ( shouldGetToken() ) {
@@ -105,6 +172,10 @@ public class Downloader extends Transfer
log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" );
return false;
}
+ if ( useCompression ) {
+ // Request compressed transfer
+ sendUseCompression();
+ }
// Send range request
if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) {
log.error( "Could not send next range request, download failed." );
@@ -123,12 +194,14 @@ public class Downloader extends Transfer
}
// Receive requested range
int chunkLength = requestedRange.getLength();
+ // If the uploader sets the COMPRESS field, assume compressed chunk
+ InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer;
byte[] incoming = new byte[ 500000 ]; // 500kb
int hasRead = 0;
while ( hasRead < chunkLength ) {
int ret;
try {
- ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
+ ret = inStream.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
if ( Thread.currentThread().isInterrupted() ) {
log.debug( "Thread interrupted in download loop" );
return false;
@@ -151,6 +224,7 @@ public class Downloader extends Transfer
}
sendDone();
sendEndOfMeta();
+ compressedIn.printStats();
try {
transferSocket.shutdownOutput();
} catch ( IOException e ) {