summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSimon Rettberg2018-05-16 13:08:44 +0200
committerSimon Rettberg2018-05-16 13:08:44 +0200
commite519e8edee76b0861f0684823f8f2a0cb9938ff3 (patch)
tree0380ebc565c0c17af8671b8f965c5015f8c16cf3 /src
parentAdd PrioThreadFactory (diff)
downloadmaster-sync-shared-e519e8edee76b0861f0684823f8f2a0cb9938ff3.tar.gz
master-sync-shared-e519e8edee76b0861f0684823f8f2a0cb9938ff3.tar.xz
master-sync-shared-e519e8edee76b0861f0684823f8f2a0cb9938ff3.zip
More speedup: LZ4 and empty chunk detection
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/openslx/filetransfer/Downloader.java78
-rw-r--r--src/main/java/org/openslx/filetransfer/Listener.java4
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java25
-rw-r--r--src/main/java/org/openslx/filetransfer/Uploader.java78
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java22
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java22
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java7
7 files changed, 228 insertions, 8 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java
index cb933af..50162fc 100644
--- a/src/main/java/org/openslx/filetransfer/Downloader.java
+++ b/src/main/java/org/openslx/filetransfer/Downloader.java
@@ -1,20 +1,28 @@
package org.openslx.filetransfer;
+import java.io.DataInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
import javax.net.ssl.SSLContext;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
import org.apache.log4j.Logger;
public class Downloader extends Transfer
{
private static final Logger log = Logger.getLogger( Downloader.class );
-
+
+ private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor();
+
+ private final Lz4InStream compressedIn;
+
/***********************************************************************/
/**
* Actively initiate a connection to a remote peer for downloading.
@@ -26,6 +34,7 @@ public class Downloader extends Transfer
public Downloader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
{
super( host, port, readTimeoutMs, context, log );
+ compressedIn = new Lz4InStream( dataFromServer );
outStream.writeByte( 'D' );
if ( !sendToken( token ) || !sendEndOfMeta() )
throw new IOException( "Sending token failed" );
@@ -41,6 +50,7 @@ public class Downloader extends Transfer
protected Downloader( Socket socket ) throws IOException
{
super( socket, log );
+ compressedIn = new Lz4InStream( dataFromServer );
}
/**
@@ -81,6 +91,62 @@ public class Downloader extends Transfer
Transfer.safeClose( file );
}
}
+
+ private class Lz4InStream extends InputStream
+ {
+ private final DataInputStream parentStream;
+
+ private long compressed, uncompressed;
+
+ private byte[] buffer;
+
+ public Lz4InStream( DataInputStream in )
+ {
+ parentStream = in;
+ log.info( "DeCompressor: " + decompressor.getClass().getSimpleName() );
+ }
+
+ @Override
+ public int read( byte b[], int off, int len ) throws IOException
+ {
+ try {
+ int decompressedLength = parentStream.readInt();
+ int compressedLength = parentStream.readInt();
+ compressed += compressedLength;
+ uncompressed += decompressedLength;
+ if ( decompressedLength > len ) {
+ throw new RuntimeException( "This should never happen! ;)" );
+ }
+ if ( decompressedLength == compressedLength ) {
+ parentStream.readFully( b, off, decompressedLength );
+ } else {
+ // Compressed
+ if ( buffer == null || buffer.length < compressedLength ) {
+ buffer = new byte[ compressedLength ];
+ }
+ parentStream.readFully( buffer, 0, compressedLength );
+ decompressor.decompress( buffer, 0, b, off, decompressedLength );
+ }
+ return decompressedLength;
+ } catch ( Throwable e ) {
+ throw new IOException( e );
+ }
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ throw new UnsupportedOperationException( "Cant do this!" );
+ }
+
+ public void printStats()
+ {
+ if ( compressed == 0 )
+ return;
+ log.info( "Received bytes: " + compressed + ", decompressed bytes: " + uncompressed );
+ }
+
+ }
/**
* Initiate the download. This method does not return until the file transfer finished.
@@ -92,6 +158,7 @@ public class Downloader extends Transfer
* more parts are needed, which in turn let's this method return true
* @return true on success, false otherwise
*/
+ @SuppressWarnings( "resource" )
public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback )
{
if ( shouldGetToken() ) {
@@ -105,6 +172,10 @@ public class Downloader extends Transfer
log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" );
return false;
}
+ if ( useCompression ) {
+ // Request compressed transfer
+ sendUseCompression();
+ }
// Send range request
if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) {
log.error( "Could not send next range request, download failed." );
@@ -123,12 +194,14 @@ public class Downloader extends Transfer
}
// Receive requested range
int chunkLength = requestedRange.getLength();
+ // If the uploader sets the COMPRESS field, assume compressed chunk
+ InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer;
byte[] incoming = new byte[ 500000 ]; // 500kb
int hasRead = 0;
while ( hasRead < chunkLength ) {
int ret;
try {
- ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
+ ret = inStream.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
if ( Thread.currentThread().isInterrupted() ) {
log.debug( "Thread interrupted in download loop" );
return false;
@@ -151,6 +224,7 @@ public class Downloader extends Transfer
}
sendDone();
sendEndOfMeta();
+ compressedIn.printStats();
try {
transferSocket.shutdownOutput();
} catch ( IOException e ) {
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java
index f7d4225..a0fc172 100644
--- a/src/main/java/org/openslx/filetransfer/Listener.java
+++ b/src/main/java/org/openslx/filetransfer/Listener.java
@@ -14,6 +14,7 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
import org.apache.log4j.Logger;
+import org.openslx.util.PrioThreadFactory;
public class Listener
{
@@ -23,7 +24,8 @@ public class Listener
private ServerSocket listenSocket = null;
private Thread acceptThread = null;
private final int readTimeoutMs;
- private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+ private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(),
+ new PrioThreadFactory( "BFTP-Init" ) );
private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85.
private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68.
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index 01c7227..589d142 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -13,6 +13,8 @@ import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
+import net.jpountz.lz4.LZ4Factory;
+
import org.apache.log4j.Logger;
import org.openslx.util.Util;
@@ -23,9 +25,12 @@ public abstract class Transfer
protected final DataInputStream dataFromServer;
protected String ERROR = null;
private boolean shouldGetToken;
+ protected boolean useCompression = true;
protected final Logger log;
+ protected final static LZ4Factory lz4factory = LZ4Factory.fastestInstance();
+
/**
* Actively initiated transfer.
*
@@ -79,6 +84,15 @@ public abstract class Transfer
}
return true;
}
+
+ protected void sendUseCompression()
+ {
+ try {
+ sendKeyValuePair( "COMPRESS", "true" );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ }
+ }
/***********************************************************************/
/**
@@ -281,6 +295,9 @@ public abstract class Transfer
MetaData meta = readMetaData();
if ( meta == null )
return null;
+ if (meta.peerWantsCompression()) {
+ useCompression = true;
+ }
return meta.getToken();
}
@@ -381,6 +398,14 @@ public abstract class Transfer
return new FileRange( start, end );
}
+ /**
+ * Peer indicated that it wants to use snappy compression.
+ */
+ public boolean peerWantsCompression()
+ {
+ return meta.containsKey( "COMPRESS" );
+ }
+
}
}
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java
index 6766ba7..e82955b 100644
--- a/src/main/java/org/openslx/filetransfer/Uploader.java
+++ b/src/main/java/org/openslx/filetransfer/Uploader.java
@@ -1,20 +1,28 @@
package org.openslx.filetransfer;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
import javax.net.ssl.SSLContext;
+import net.jpountz.lz4.LZ4Compressor;
+
import org.apache.log4j.Logger;
public class Uploader extends Transfer
{
private static final Logger log = Logger.getLogger( Uploader.class );
-
+
+ private final LZ4Compressor compressor = lz4factory.fastCompressor();
+
+ private final Lz4OutStream compressedOut;
+
/***********************************************************************/
/**
* Actively establish upload connection to given peer.
@@ -27,6 +35,7 @@ public class Uploader extends Transfer
public Uploader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
{
super( host, port, readTimeoutMs, context, log );
+ compressedOut = new Lz4OutStream( outStream );
outStream.writeByte( 'U' );
if ( !sendToken( token ) || !sendEndOfMeta() )
throw new IOException( "Sending token failed" );
@@ -42,6 +51,7 @@ public class Uploader extends Transfer
public Uploader( Socket socket ) throws IOException
{
super( socket, log );
+ compressedOut = new Lz4OutStream( outStream );
}
/***********************************************************************/
@@ -54,6 +64,62 @@ public class Uploader extends Transfer
{
return upload( filename, null );
}
+
+ private class Lz4OutStream extends OutputStream
+ {
+
+ private final DataOutputStream parentStream;
+
+ private byte[] buffer;
+
+ private long compressed, uncompressed;
+
+ private int chunksCompressed, chunksUncompressed;
+
+ public Lz4OutStream( DataOutputStream out )
+ {
+ parentStream = out;
+ log.info( "Compressor: " + compressor.getClass().getSimpleName() );
+ }
+
+ @Override
+ public void write( int b ) throws IOException
+ {
+ throw new UnsupportedOperationException( "Cannot do this" );
+ }
+
+ @Override
+ public void write( byte[] data, int off, int decompressedLength ) throws IOException
+ {
+ int maxCompressedLength = compressor.maxCompressedLength( decompressedLength );
+ if ( buffer == null || buffer.length < maxCompressedLength ) {
+ buffer = new byte[ maxCompressedLength ];
+ }
+ uncompressed += decompressedLength;
+ int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength );
+ parentStream.writeInt( decompressedLength );
+ if ( ( compressedLength * 9 / 8 ) < decompressedLength ) {
+ compressed += compressedLength;
+ chunksCompressed++;
+ parentStream.writeInt( compressedLength );
+ parentStream.write( buffer, 0, compressedLength );
+ } else {
+ compressed += decompressedLength;
+ chunksUncompressed++;
+ parentStream.writeInt( decompressedLength );
+ parentStream.write( data, off, decompressedLength );
+ }
+ }
+
+ public void printStats()
+ {
+ if ( compressed == 0 )
+ return;
+ log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed );
+ log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
+ }
+
+ }
@SuppressWarnings( "resource" )
public boolean upload( String filename, UploadStatusCallback callback )
@@ -102,8 +168,13 @@ public class Uploader extends Transfer
this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true );
return false;
}
- // Send confirmation of range we're about to send
+ // Send confirmation of range and compression mode we're about to send
+ OutputStream outStr = outStream;
try {
+ if ( meta.peerWantsCompression() && useCompression ) {
+ sendUseCompression();
+ outStr = compressedOut;
+ }
long ptr = file.getFilePointer();
if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
this.close( "Could not send range confirmation" );
@@ -131,7 +202,7 @@ public class Uploader extends Transfer
}
hasRead += ret;
try {
- outStream.write( data, 0, ret );
+ outStr.write( data, 0, ret );
} catch ( IOException e ) {
this.close( "Sending payload failed" );
return false;
@@ -142,6 +213,7 @@ public class Uploader extends Transfer
}
} finally {
Transfer.safeClose( file, transferSocket );
+ compressedOut.printStats();
}
return true;
}
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index 11f64e8..91d6f1e 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -69,8 +69,14 @@ public class ChunkList
if ( index >= allChunks.size() )
break;
if ( sum != null ) {
- if ( allChunks.get( index ).setSha1Sum( sum ) && firstNew == -1 ) {
- firstNew = index;
+ FileChunk chunk = allChunks.get( index );
+ if ( chunk.setSha1Sum( sum ) ) {
+ if ( firstNew == -1 ) {
+ firstNew = index;
+ }
+ if ( chunk.status == ChunkStatus.MISSING && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, sum ) ) {
+ markMissingAsComplete( index );
+ }
}
if ( !hasChecksum ) {
hasChecksum = true;
@@ -473,4 +479,16 @@ public class ChunkList
return true;
}
+ /**
+ * Returns true if the last chunk is exactly 16MiB and all zeros
+ * @return
+ */
+ public boolean lastChunkIsZero()
+ {
+ if ( allChunks.isEmpty() )
+ return false;
+ FileChunk chunk = allChunks.get( allChunks.size() - 1 );
+ return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum );
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index f302b3c..6594e31 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.zip.CRC32;
@@ -28,6 +29,13 @@ public class FileChunk
private boolean writtenToDisk = false;
private ChunkSource localSource = null;
+ static final byte[] NULL_BLOCK_SHA1 = new byte[] {
+ 0x3b, 0x44, 0x17, (byte)0xfc, 0x42, 0x1c, (byte)0xee, 0x30, (byte)0xa9, (byte)0xad, 0x0f,
+ (byte)0xd9, 0x31, (byte)0x92, 0x20, (byte)0xa8, (byte)0xda, (byte)0xe3, 0x2d, (byte)0xa2
+ };
+
+ static final long NULL_BLOCK_CRC32 = 2759631178l;
+
public FileChunk( long startOffset, long endOffset, byte[] sha1sum )
{
this.range = new FileRange( startOffset, endOffset );
@@ -43,6 +51,20 @@ public class FileChunk
if ( this.sha1sum != null || sha1sum == null || sha1sum.length != SHA1_LENGTH )
return false;
this.sha1sum = sha1sum;
+ if ( Arrays.equals( sha1sum, NULL_BLOCK_SHA1 ) ) {
+ //
+ writtenToDisk = true;
+ if ( crc32 == null ) {
+ crc32 = new CRC32() {
+ @Override
+ public long getValue()
+ {
+ return NULL_BLOCK_CRC32;
+ }
+ };
+ }
+ return true;
+ }
if ( this.status == ChunkStatus.COMPLETE ) {
this.status = ChunkStatus.HASHING;
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 0355182..eaef63b 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -643,6 +643,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( state == TransferState.FINISHED ) {
return;
}
+ try {
+ if ( tmpFileHandle.length() < fileSize && chunks.lastChunkIsZero() ) {
+ tmpFileHandle.setLength( fileSize );
+ }
+ } catch ( IOException e) {
+ LOGGER.warn( "Cannot extend file size to " + fileSize );
+ }
safeClose( tmpFileHandle );
if ( localCopyManager != null ) {
localCopyManager.interrupt();