diff options
author | Simon Rettberg | 2023-05-24 16:25:37 +0200 |
---|---|---|
committer | Simon Rettberg | 2023-05-24 16:25:37 +0200 |
commit | 870ed89ca71e65f0d365d929bd2d447f3b3342c0 (patch) | |
tree | beebfdc3353f6eafc2ca41b3290269d9581bd5b6 | |
parent | ThiriftHelper: Discard connections from pool after idling for too long (diff) | |
download | master-sync-shared-870ed89ca71e65f0d365d929bd2d447f3b3342c0.tar.gz master-sync-shared-870ed89ca71e65f0d365d929bd2d447f3b3342c0.tar.xz master-sync-shared-870ed89ca71e65f0d365d929bd2d447f3b3342c0.zip |
BFTP: Minor cleanup, comments, refactor
3 files changed, 36 insertions, 25 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java index 0d5921a..bee650c 100644 --- a/src/main/java/org/openslx/filetransfer/Listener.java +++ b/src/main/java/org/openslx/filetransfer/Listener.java @@ -11,6 +11,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLException; import javax.net.ssl.SSLServerSocketFactory; import org.apache.logging.log4j.LogManager; @@ -28,8 +29,8 @@ public class Listener private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new PrioThreadFactory( "BFTP-Init" ) ); - private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85. - private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68. + private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // ASCII 'U' = 85. + private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // ASCII 'D' = 68. private static Logger log = LogManager.getLogger( Listener.class ); /***********************************************************************/ @@ -68,8 +69,10 @@ public class Listener SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory(); listenSocket = sslServerSocketFactory.createServerSocket(); } + listenSocket.setSoTimeout( 5000 ); listenSocket.setReuseAddress( true ); listenSocket.bind( new InetSocketAddress( this.port ) ); + listenSocket.setSoTimeout( 0 ); } catch ( Exception e ) { log.error( "Cannot listen on port " + this.port, e ); listenSocket = null; @@ -83,16 +86,16 @@ public class Listener if ( acceptThread != null ) return; final Listener instance = this; - acceptThread = new Thread( "BFTP-Listen-" + this.port ) { + acceptThread = new Thread( "BFTP:" + this.port ) { @Override public void run() { try { // Run accept loop in own thread while ( !isInterrupted() ) { - Socket acceptedSocket = null; + final Socket connection; try { - acceptedSocket = listenSocket.accept(); + connection = listenSocket.accept(); } catch ( SocketTimeoutException e ) { continue; } catch ( Exception e ) { @@ -106,14 +109,12 @@ public class Listener continue; } // Handle each accepted connection in a thread pool - final Socket connection = acceptedSocket; Runnable handler = new Runnable() { @Override public void run() { - try { - // Give initial byte signalling mode of operation 5 secs to arrive + // Give initial byte signaling mode of operation 5 secs to arrive connection.setSoTimeout( 5000 ); byte[] b = new byte[ 1 ]; @@ -128,29 +129,30 @@ public class Listener if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) { // --> start Downloader(socket). Downloader d = new Downloader( connection ); + // Will take care of connection cleanup incomingEvent.incomingUploadRequest( d ); } else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) { // --> start Uploader(socket). Uploader u = new Uploader( connection ); + // Will take care of connection cleanup incomingEvent.incomingDownloadRequest( u ); } else { - log.debug( "Got invalid init-byte ... close connection" ); + log.debug( "Got invalid init-byte ... closing connection" ); Transfer.safeClose( connection ); } + } catch ( SSLException e ) { + Transfer.safeClose( connection ); + log.warn( "SSL error when acceping client " + connection.getInetAddress().getHostAddress() ); } catch ( Exception e ) { - String m = e.getMessage(); - if ( !m.contains( "Remote host terminated the handshake" ) - && !m.contains( "Unsupported or unrecognized SSL message" ) ) { - log.warn( "Error accepting client", e ); - } Transfer.safeClose( connection ); + log.warn( "Error handling client", e ); } } }; try { processingPool.execute( handler ); } catch ( RejectedExecutionException e ) { - Transfer.safeClose( acceptedSocket ); + Transfer.safeClose( connection ); } } } finally { diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java index ed6e972..a1ad313 100644 --- a/src/main/java/org/openslx/filetransfer/Uploader.java +++ b/src/main/java/org/openslx/filetransfer/Uploader.java @@ -66,6 +66,10 @@ public class Uploader extends Transfer return upload( filename, null ); } + /** + * Compressing output stream that will either write LZ4-compressed data, or if the data + * doesn't compress well, just the original uncompressed data. + */ private class Lz4OutStream extends OutputStream { @@ -73,7 +77,7 @@ public class Uploader extends Transfer private byte[] buffer; - private long compressed, uncompressed; + private long bytesSentTotal, bytesDecompressedTotal; private int chunksCompressed, chunksUncompressed; @@ -96,16 +100,17 @@ public class Uploader extends Transfer if ( buffer == null || buffer.length < maxCompressedLength ) { buffer = new byte[ maxCompressedLength ]; } - uncompressed += decompressedLength; + bytesDecompressedTotal += decompressedLength; int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength ); parentStream.writeInt( decompressedLength ); + // Only send compressed data if we got down to at least ~88% the original size if ( ( compressedLength * 9 / 8 ) < decompressedLength ) { - compressed += compressedLength; + bytesSentTotal += compressedLength; chunksCompressed++; parentStream.writeInt( compressedLength ); parentStream.write( buffer, 0, compressedLength ); } else { - compressed += decompressedLength; + bytesSentTotal += decompressedLength; chunksUncompressed++; parentStream.writeInt( decompressedLength ); parentStream.write( data, off, decompressedLength ); @@ -114,10 +119,10 @@ public class Uploader extends Transfer public void printStats() { - if ( compressed == 0 ) + if ( bytesSentTotal == 0 ) return; - log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed ); - log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); + log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal ); + log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); } } diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java index 15c86fb..ad2e96c 100644 --- a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java @@ -75,9 +75,13 @@ public abstract class OutgoingTransferBase extends AbstractTransfer @Override public void run() { - boolean ret = connection.upload( sourceFile.getAbsolutePath() ); - synchronized ( uploads ) { - uploads.remove( connection ); + boolean ret = false; + try { + ret = connection.upload( sourceFile.getAbsolutePath() ); + } finally { + synchronized ( uploads ) { + uploads.remove( connection ); + } } if ( ret ) { connectFails.set( 0 ); |