summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Uploader.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Uploader.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/Uploader.java78
1 files changed, 75 insertions, 3 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java
index 6766ba7..e82955b 100644
--- a/src/main/java/org/openslx/filetransfer/Uploader.java
+++ b/src/main/java/org/openslx/filetransfer/Uploader.java
@@ -1,20 +1,28 @@
package org.openslx.filetransfer;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
import javax.net.ssl.SSLContext;
+import net.jpountz.lz4.LZ4Compressor;
+
import org.apache.log4j.Logger;
public class Uploader extends Transfer
{
private static final Logger log = Logger.getLogger( Uploader.class );
-
+
+ private final LZ4Compressor compressor = lz4factory.fastCompressor();
+
+ private final Lz4OutStream compressedOut;
+
/***********************************************************************/
/**
* Actively establish upload connection to given peer.
@@ -27,6 +35,7 @@ public class Uploader extends Transfer
public Uploader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
{
super( host, port, readTimeoutMs, context, log );
+ compressedOut = new Lz4OutStream( outStream );
outStream.writeByte( 'U' );
if ( !sendToken( token ) || !sendEndOfMeta() )
throw new IOException( "Sending token failed" );
@@ -42,6 +51,7 @@ public class Uploader extends Transfer
public Uploader( Socket socket ) throws IOException
{
super( socket, log );
+ compressedOut = new Lz4OutStream( outStream );
}
/***********************************************************************/
@@ -54,6 +64,62 @@ public class Uploader extends Transfer
{
return upload( filename, null );
}
+
+ private class Lz4OutStream extends OutputStream
+ {
+
+ private final DataOutputStream parentStream;
+
+ private byte[] buffer;
+
+ private long compressed, uncompressed;
+
+ private int chunksCompressed, chunksUncompressed;
+
+ public Lz4OutStream( DataOutputStream out )
+ {
+ parentStream = out;
+ log.info( "Compressor: " + compressor.getClass().getSimpleName() );
+ }
+
+ @Override
+ public void write( int b ) throws IOException
+ {
+ throw new UnsupportedOperationException( "Cannot do this" );
+ }
+
+ @Override
+ public void write( byte[] data, int off, int decompressedLength ) throws IOException
+ {
+ int maxCompressedLength = compressor.maxCompressedLength( decompressedLength );
+ if ( buffer == null || buffer.length < maxCompressedLength ) {
+ buffer = new byte[ maxCompressedLength ];
+ }
+ uncompressed += decompressedLength;
+ int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength );
+ parentStream.writeInt( decompressedLength );
+ if ( ( compressedLength * 9 / 8 ) < decompressedLength ) {
+ compressed += compressedLength;
+ chunksCompressed++;
+ parentStream.writeInt( compressedLength );
+ parentStream.write( buffer, 0, compressedLength );
+ } else {
+ compressed += decompressedLength;
+ chunksUncompressed++;
+ parentStream.writeInt( decompressedLength );
+ parentStream.write( data, off, decompressedLength );
+ }
+ }
+
+ public void printStats()
+ {
+ if ( compressed == 0 )
+ return;
+ log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed );
+ log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
+ }
+
+ }
@SuppressWarnings( "resource" )
public boolean upload( String filename, UploadStatusCallback callback )
@@ -102,8 +168,13 @@ public class Uploader extends Transfer
this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true );
return false;
}
- // Send confirmation of range we're about to send
+ // Send confirmation of range and compression mode we're about to send
+ OutputStream outStr = outStream;
try {
+ if ( meta.peerWantsCompression() && useCompression ) {
+ sendUseCompression();
+ outStr = compressedOut;
+ }
long ptr = file.getFilePointer();
if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
this.close( "Could not send range confirmation" );
@@ -131,7 +202,7 @@ public class Uploader extends Transfer
}
hasRead += ret;
try {
- outStream.write( data, 0, ret );
+ outStr.write( data, 0, ret );
} catch ( IOException e ) {
this.close( "Sending payload failed" );
return false;
@@ -142,6 +213,7 @@ public class Uploader extends Transfer
}
} finally {
Transfer.safeClose( file, transferSocket );
+ compressedOut.printStats();
}
return true;
}