From e519e8edee76b0861f0684823f8f2a0cb9938ff3 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 16 May 2018 13:08:44 +0200 Subject: More speedup: LZ4 and empty chunk detection --- .../java/org/openslx/filetransfer/Downloader.java | 78 +++++++++++++++++++++- .../java/org/openslx/filetransfer/Listener.java | 4 +- .../java/org/openslx/filetransfer/Transfer.java | 25 +++++++ .../java/org/openslx/filetransfer/Uploader.java | 78 +++++++++++++++++++++- .../org/openslx/filetransfer/util/ChunkList.java | 22 +++++- .../org/openslx/filetransfer/util/FileChunk.java | 22 ++++++ .../filetransfer/util/IncomingTransferBase.java | 7 ++ 7 files changed, 228 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java index cb933af..50162fc 100644 --- a/src/main/java/org/openslx/filetransfer/Downloader.java +++ b/src/main/java/org/openslx/filetransfer/Downloader.java @@ -1,20 +1,28 @@ package org.openslx.filetransfer; +import java.io.DataInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.RandomAccessFile; import java.net.Socket; import javax.net.ssl.SSLContext; +import net.jpountz.lz4.LZ4FastDecompressor; + import org.apache.log4j.Logger; public class Downloader extends Transfer { private static final Logger log = Logger.getLogger( Downloader.class ); - + + private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor(); + + private final Lz4InStream compressedIn; + /***********************************************************************/ /** * Actively initiate a connection to a remote peer for downloading. @@ -26,6 +34,7 @@ public class Downloader extends Transfer public Downloader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException { super( host, port, readTimeoutMs, context, log ); + compressedIn = new Lz4InStream( dataFromServer ); outStream.writeByte( 'D' ); if ( !sendToken( token ) || !sendEndOfMeta() ) throw new IOException( "Sending token failed" ); @@ -41,6 +50,7 @@ public class Downloader extends Transfer protected Downloader( Socket socket ) throws IOException { super( socket, log ); + compressedIn = new Lz4InStream( dataFromServer ); } /** @@ -81,6 +91,62 @@ public class Downloader extends Transfer Transfer.safeClose( file ); } } + + private class Lz4InStream extends InputStream + { + private final DataInputStream parentStream; + + private long compressed, uncompressed; + + private byte[] buffer; + + public Lz4InStream( DataInputStream in ) + { + parentStream = in; + log.info( "DeCompressor: " + decompressor.getClass().getSimpleName() ); + } + + @Override + public int read( byte b[], int off, int len ) throws IOException + { + try { + int decompressedLength = parentStream.readInt(); + int compressedLength = parentStream.readInt(); + compressed += compressedLength; + uncompressed += decompressedLength; + if ( decompressedLength > len ) { + throw new RuntimeException( "This should never happen! ;)" ); + } + if ( decompressedLength == compressedLength ) { + parentStream.readFully( b, off, decompressedLength ); + } else { + // Compressed + if ( buffer == null || buffer.length < compressedLength ) { + buffer = new byte[ compressedLength ]; + } + parentStream.readFully( buffer, 0, compressedLength ); + decompressor.decompress( buffer, 0, b, off, decompressedLength ); + } + return decompressedLength; + } catch ( Throwable e ) { + throw new IOException( e ); + } + } + + @Override + public int read() throws IOException + { + throw new UnsupportedOperationException( "Cant do this!" ); + } + + public void printStats() + { + if ( compressed == 0 ) + return; + log.info( "Received bytes: " + compressed + ", decompressed bytes: " + uncompressed ); + } + + } /** * Initiate the download. This method does not return until the file transfer finished. @@ -92,6 +158,7 @@ public class Downloader extends Transfer * more parts are needed, which in turn let's this method return true * @return true on success, false otherwise */ + @SuppressWarnings( "resource" ) public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback ) { if ( shouldGetToken() ) { @@ -105,6 +172,10 @@ public class Downloader extends Transfer log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" ); return false; } + if ( useCompression ) { + // Request compressed transfer + sendUseCompression(); + } // Send range request if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) { log.error( "Could not send next range request, download failed." ); @@ -123,12 +194,14 @@ public class Downloader extends Transfer } // Receive requested range int chunkLength = requestedRange.getLength(); + // If the uploader sets the COMPRESS field, assume compressed chunk + InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer; byte[] incoming = new byte[ 500000 ]; // 500kb int hasRead = 0; while ( hasRead < chunkLength ) { int ret; try { - ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) ); + ret = inStream.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) ); if ( Thread.currentThread().isInterrupted() ) { log.debug( "Thread interrupted in download loop" ); return false; @@ -151,6 +224,7 @@ public class Downloader extends Transfer } sendDone(); sendEndOfMeta(); + compressedIn.printStats(); try { transferSocket.shutdownOutput(); } catch ( IOException e ) { diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java index f7d4225..a0fc172 100644 --- a/src/main/java/org/openslx/filetransfer/Listener.java +++ b/src/main/java/org/openslx/filetransfer/Listener.java @@ -14,6 +14,7 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLServerSocketFactory; import org.apache.log4j.Logger; +import org.openslx.util.PrioThreadFactory; public class Listener { @@ -23,7 +24,8 @@ public class Listener private ServerSocket listenSocket = null; private Thread acceptThread = null; private final int readTimeoutMs; - private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue() ); + private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue(), + new PrioThreadFactory( "BFTP-Init" ) ); private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85. private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68. diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java index 01c7227..589d142 100644 --- a/src/main/java/org/openslx/filetransfer/Transfer.java +++ b/src/main/java/org/openslx/filetransfer/Transfer.java @@ -13,6 +13,8 @@ import java.util.Map; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; +import net.jpountz.lz4.LZ4Factory; + import org.apache.log4j.Logger; import org.openslx.util.Util; @@ -23,9 +25,12 @@ public abstract class Transfer protected final DataInputStream dataFromServer; protected String ERROR = null; private boolean shouldGetToken; + protected boolean useCompression = true; protected final Logger log; + protected final static LZ4Factory lz4factory = LZ4Factory.fastestInstance(); + /** * Actively initiated transfer. * @@ -79,6 +84,15 @@ public abstract class Transfer } return true; } + + protected void sendUseCompression() + { + try { + sendKeyValuePair( "COMPRESS", "true" ); + } catch ( IOException e ) { + e.printStackTrace(); + } + } /***********************************************************************/ /** @@ -281,6 +295,9 @@ public abstract class Transfer MetaData meta = readMetaData(); if ( meta == null ) return null; + if (meta.peerWantsCompression()) { + useCompression = true; + } return meta.getToken(); } @@ -381,6 +398,14 @@ public abstract class Transfer return new FileRange( start, end ); } + /** + * Peer indicated that it wants to use snappy compression. + */ + public boolean peerWantsCompression() + { + return meta.containsKey( "COMPRESS" ); + } + } } diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java index 6766ba7..e82955b 100644 --- a/src/main/java/org/openslx/filetransfer/Uploader.java +++ b/src/main/java/org/openslx/filetransfer/Uploader.java @@ -1,20 +1,28 @@ package org.openslx.filetransfer; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.io.RandomAccessFile; import java.net.Socket; import javax.net.ssl.SSLContext; +import net.jpountz.lz4.LZ4Compressor; + import org.apache.log4j.Logger; public class Uploader extends Transfer { private static final Logger log = Logger.getLogger( Uploader.class ); - + + private final LZ4Compressor compressor = lz4factory.fastCompressor(); + + private final Lz4OutStream compressedOut; + /***********************************************************************/ /** * Actively establish upload connection to given peer. @@ -27,6 +35,7 @@ public class Uploader extends Transfer public Uploader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException { super( host, port, readTimeoutMs, context, log ); + compressedOut = new Lz4OutStream( outStream ); outStream.writeByte( 'U' ); if ( !sendToken( token ) || !sendEndOfMeta() ) throw new IOException( "Sending token failed" ); @@ -42,6 +51,7 @@ public class Uploader extends Transfer public Uploader( Socket socket ) throws IOException { super( socket, log ); + compressedOut = new Lz4OutStream( outStream ); } /***********************************************************************/ @@ -54,6 +64,62 @@ public class Uploader extends Transfer { return upload( filename, null ); } + + private class Lz4OutStream extends OutputStream + { + + private final DataOutputStream parentStream; + + private byte[] buffer; + + private long compressed, uncompressed; + + private int chunksCompressed, chunksUncompressed; + + public Lz4OutStream( DataOutputStream out ) + { + parentStream = out; + log.info( "Compressor: " + compressor.getClass().getSimpleName() ); + } + + @Override + public void write( int b ) throws IOException + { + throw new UnsupportedOperationException( "Cannot do this" ); + } + + @Override + public void write( byte[] data, int off, int decompressedLength ) throws IOException + { + int maxCompressedLength = compressor.maxCompressedLength( decompressedLength ); + if ( buffer == null || buffer.length < maxCompressedLength ) { + buffer = new byte[ maxCompressedLength ]; + } + uncompressed += decompressedLength; + int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength ); + parentStream.writeInt( decompressedLength ); + if ( ( compressedLength * 9 / 8 ) < decompressedLength ) { + compressed += compressedLength; + chunksCompressed++; + parentStream.writeInt( compressedLength ); + parentStream.write( buffer, 0, compressedLength ); + } else { + compressed += decompressedLength; + chunksUncompressed++; + parentStream.writeInt( decompressedLength ); + parentStream.write( data, off, decompressedLength ); + } + } + + public void printStats() + { + if ( compressed == 0 ) + return; + log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed ); + log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); + } + + } @SuppressWarnings( "resource" ) public boolean upload( String filename, UploadStatusCallback callback ) @@ -102,8 +168,13 @@ public class Uploader extends Transfer this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true ); return false; } - // Send confirmation of range we're about to send + // Send confirmation of range and compression mode we're about to send + OutputStream outStr = outStream; try { + if ( meta.peerWantsCompression() && useCompression ) { + sendUseCompression(); + outStr = compressedOut; + } long ptr = file.getFilePointer(); if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) { this.close( "Could not send range confirmation" ); @@ -131,7 +202,7 @@ public class Uploader extends Transfer } hasRead += ret; try { - outStream.write( data, 0, ret ); + outStr.write( data, 0, ret ); } catch ( IOException e ) { this.close( "Sending payload failed" ); return false; @@ -142,6 +213,7 @@ public class Uploader extends Transfer } } finally { Transfer.safeClose( file, transferSocket ); + compressedOut.printStats(); } return true; } diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 11f64e8..91d6f1e 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -69,8 +69,14 @@ public class ChunkList if ( index >= allChunks.size() ) break; if ( sum != null ) { - if ( allChunks.get( index ).setSha1Sum( sum ) && firstNew == -1 ) { - firstNew = index; + FileChunk chunk = allChunks.get( index ); + if ( chunk.setSha1Sum( sum ) ) { + if ( firstNew == -1 ) { + firstNew = index; + } + if ( chunk.status == ChunkStatus.MISSING && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, sum ) ) { + markMissingAsComplete( index ); + } } if ( !hasChecksum ) { hasChecksum = true; @@ -473,4 +479,16 @@ public class ChunkList return true; } + /** + * Returns true if the last chunk is exactly 16MiB and all zeros + * @return + */ + public boolean lastChunkIsZero() + { + if ( allChunks.isEmpty() ) + return false; + FileChunk chunk = allChunks.get( allChunks.size() - 1 ); + return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum ); + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index f302b3c..6594e31 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.zip.CRC32; @@ -28,6 +29,13 @@ public class FileChunk private boolean writtenToDisk = false; private ChunkSource localSource = null; + static final byte[] NULL_BLOCK_SHA1 = new byte[] { + 0x3b, 0x44, 0x17, (byte)0xfc, 0x42, 0x1c, (byte)0xee, 0x30, (byte)0xa9, (byte)0xad, 0x0f, + (byte)0xd9, 0x31, (byte)0x92, 0x20, (byte)0xa8, (byte)0xda, (byte)0xe3, 0x2d, (byte)0xa2 + }; + + static final long NULL_BLOCK_CRC32 = 2759631178l; + public FileChunk( long startOffset, long endOffset, byte[] sha1sum ) { this.range = new FileRange( startOffset, endOffset ); @@ -43,6 +51,20 @@ public class FileChunk if ( this.sha1sum != null || sha1sum == null || sha1sum.length != SHA1_LENGTH ) return false; this.sha1sum = sha1sum; + if ( Arrays.equals( sha1sum, NULL_BLOCK_SHA1 ) ) { + // + writtenToDisk = true; + if ( crc32 == null ) { + crc32 = new CRC32() { + @Override + public long getValue() + { + return NULL_BLOCK_CRC32; + } + }; + } + return true; + } if ( this.status == ChunkStatus.COMPLETE ) { this.status = ChunkStatus.HASHING; } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 0355182..eaef63b 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -643,6 +643,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( state == TransferState.FINISHED ) { return; } + try { + if ( tmpFileHandle.length() < fileSize && chunks.lastChunkIsZero() ) { + tmpFileHandle.setLength( fileSize ); + } + } catch ( IOException e) { + LOGGER.warn( "Cannot extend file size to " + fileSize ); + } safeClose( tmpFileHandle ); if ( localCopyManager != null ) { localCopyManager.interrupt(); -- cgit v1.2.3-55-g7522