diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Uploader.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/Uploader.java | 78 |
1 files changed, 75 insertions, 3 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java index 6766ba7..e82955b 100644 --- a/src/main/java/org/openslx/filetransfer/Uploader.java +++ b/src/main/java/org/openslx/filetransfer/Uploader.java @@ -1,20 +1,28 @@ package org.openslx.filetransfer; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.io.RandomAccessFile; import java.net.Socket; import javax.net.ssl.SSLContext; +import net.jpountz.lz4.LZ4Compressor; + import org.apache.log4j.Logger; public class Uploader extends Transfer { private static final Logger log = Logger.getLogger( Uploader.class ); - + + private final LZ4Compressor compressor = lz4factory.fastCompressor(); + + private final Lz4OutStream compressedOut; + /***********************************************************************/ /** * Actively establish upload connection to given peer. @@ -27,6 +35,7 @@ public class Uploader extends Transfer public Uploader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException { super( host, port, readTimeoutMs, context, log ); + compressedOut = new Lz4OutStream( outStream ); outStream.writeByte( 'U' ); if ( !sendToken( token ) || !sendEndOfMeta() ) throw new IOException( "Sending token failed" ); @@ -42,6 +51,7 @@ public class Uploader extends Transfer public Uploader( Socket socket ) throws IOException { super( socket, log ); + compressedOut = new Lz4OutStream( outStream ); } /***********************************************************************/ @@ -54,6 +64,62 @@ public class Uploader extends Transfer { return upload( filename, null ); } + + private class Lz4OutStream extends OutputStream + { + + private final DataOutputStream parentStream; + + private byte[] buffer; + + private long compressed, uncompressed; + + private int chunksCompressed, chunksUncompressed; + + public Lz4OutStream( DataOutputStream out ) + { + parentStream = out; + log.info( "Compressor: " + compressor.getClass().getSimpleName() ); + } + + @Override + public void write( int b ) throws IOException + { + throw new UnsupportedOperationException( "Cannot do this" ); + } + + @Override + public void write( byte[] data, int off, int decompressedLength ) throws IOException + { + int maxCompressedLength = compressor.maxCompressedLength( decompressedLength ); + if ( buffer == null || buffer.length < maxCompressedLength ) { + buffer = new byte[ maxCompressedLength ]; + } + uncompressed += decompressedLength; + int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength ); + parentStream.writeInt( decompressedLength ); + if ( ( compressedLength * 9 / 8 ) < decompressedLength ) { + compressed += compressedLength; + chunksCompressed++; + parentStream.writeInt( compressedLength ); + parentStream.write( buffer, 0, compressedLength ); + } else { + compressed += decompressedLength; + chunksUncompressed++; + parentStream.writeInt( decompressedLength ); + parentStream.write( data, off, decompressedLength ); + } + } + + public void printStats() + { + if ( compressed == 0 ) + return; + log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed ); + log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); + } + + } @SuppressWarnings( "resource" ) public boolean upload( String filename, UploadStatusCallback callback ) @@ -102,8 +168,13 @@ public class Uploader extends Transfer this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true ); return false; } - // Send confirmation of range we're about to send + // Send confirmation of range and compression mode we're about to send + OutputStream outStr = outStream; try { + if ( meta.peerWantsCompression() && useCompression ) { + sendUseCompression(); + outStr = compressedOut; + } long ptr = file.getFilePointer(); if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) { this.close( "Could not send range confirmation" ); @@ -131,7 +202,7 @@ public class Uploader extends Transfer } hasRead += ret; try { - outStream.write( data, 0, ret ); + outStr.write( data, 0, ret ); } catch ( IOException e ) { this.close( "Sending payload failed" ); return false; @@ -142,6 +213,7 @@ public class Uploader extends Transfer } } finally { Transfer.safeClose( file, transferSocket ); + compressedOut.printStats(); } return true; } |