summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2023-05-24 16:25:37 +0200
committerSimon Rettberg2023-05-24 16:25:37 +0200
commit870ed89ca71e65f0d365d929bd2d447f3b3342c0 (patch)
treebeebfdc3353f6eafc2ca41b3290269d9581bd5b6
parentThiriftHelper: Discard connections from pool after idling for too long (diff)
downloadmaster-sync-shared-870ed89ca71e65f0d365d929bd2d447f3b3342c0.tar.gz
master-sync-shared-870ed89ca71e65f0d365d929bd2d447f3b3342c0.tar.xz
master-sync-shared-870ed89ca71e65f0d365d929bd2d447f3b3342c0.zip
BFTP: Minor cleanup, comments, refactor
-rw-r--r--src/main/java/org/openslx/filetransfer/Listener.java32
-rw-r--r--src/main/java/org/openslx/filetransfer/Uploader.java19
-rw-r--r--src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java10
3 files changed, 36 insertions, 25 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java
index 0d5921a..bee650c 100644
--- a/src/main/java/org/openslx/filetransfer/Listener.java
+++ b/src/main/java/org/openslx/filetransfer/Listener.java
@@ -11,6 +11,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
import javax.net.ssl.SSLServerSocketFactory;
import org.apache.logging.log4j.LogManager;
@@ -28,8 +29,8 @@ public class Listener
private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(),
new PrioThreadFactory( "BFTP-Init" ) );
- private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85.
- private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68.
+ private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // ASCII 'U' = 85.
+ private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // ASCII 'D' = 68.
private static Logger log = LogManager.getLogger( Listener.class );
/***********************************************************************/
@@ -68,8 +69,10 @@ public class Listener
SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory();
listenSocket = sslServerSocketFactory.createServerSocket();
}
+ listenSocket.setSoTimeout( 5000 );
listenSocket.setReuseAddress( true );
listenSocket.bind( new InetSocketAddress( this.port ) );
+ listenSocket.setSoTimeout( 0 );
} catch ( Exception e ) {
log.error( "Cannot listen on port " + this.port, e );
listenSocket = null;
@@ -83,16 +86,16 @@ public class Listener
if ( acceptThread != null )
return;
final Listener instance = this;
- acceptThread = new Thread( "BFTP-Listen-" + this.port ) {
+ acceptThread = new Thread( "BFTP:" + this.port ) {
@Override
public void run()
{
try {
// Run accept loop in own thread
while ( !isInterrupted() ) {
- Socket acceptedSocket = null;
+ final Socket connection;
try {
- acceptedSocket = listenSocket.accept();
+ connection = listenSocket.accept();
} catch ( SocketTimeoutException e ) {
continue;
} catch ( Exception e ) {
@@ -106,14 +109,12 @@ public class Listener
continue;
}
// Handle each accepted connection in a thread pool
- final Socket connection = acceptedSocket;
Runnable handler = new Runnable() {
@Override
public void run()
{
-
try {
- // Give initial byte signalling mode of operation 5 secs to arrive
+ // Give initial byte signaling mode of operation 5 secs to arrive
connection.setSoTimeout( 5000 );
byte[] b = new byte[ 1 ];
@@ -128,29 +129,30 @@ public class Listener
if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) {
// --> start Downloader(socket).
Downloader d = new Downloader( connection );
+ // Will take care of connection cleanup
incomingEvent.incomingUploadRequest( d );
} else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) {
// --> start Uploader(socket).
Uploader u = new Uploader( connection );
+ // Will take care of connection cleanup
incomingEvent.incomingDownloadRequest( u );
} else {
- log.debug( "Got invalid init-byte ... close connection" );
+ log.debug( "Got invalid init-byte ... closing connection" );
Transfer.safeClose( connection );
}
+ } catch ( SSLException e ) {
+ Transfer.safeClose( connection );
+ log.warn( "SSL error when acceping client " + connection.getInetAddress().getHostAddress() );
} catch ( Exception e ) {
- String m = e.getMessage();
- if ( !m.contains( "Remote host terminated the handshake" )
- && !m.contains( "Unsupported or unrecognized SSL message" ) ) {
- log.warn( "Error accepting client", e );
- }
Transfer.safeClose( connection );
+ log.warn( "Error handling client", e );
}
}
};
try {
processingPool.execute( handler );
} catch ( RejectedExecutionException e ) {
- Transfer.safeClose( acceptedSocket );
+ Transfer.safeClose( connection );
}
}
} finally {
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java
index ed6e972..a1ad313 100644
--- a/src/main/java/org/openslx/filetransfer/Uploader.java
+++ b/src/main/java/org/openslx/filetransfer/Uploader.java
@@ -66,6 +66,10 @@ public class Uploader extends Transfer
return upload( filename, null );
}
+ /**
+ * Compressing output stream that will either write LZ4-compressed data, or if the data
+ * doesn't compress well, just the original uncompressed data.
+ */
private class Lz4OutStream extends OutputStream
{
@@ -73,7 +77,7 @@ public class Uploader extends Transfer
private byte[] buffer;
- private long compressed, uncompressed;
+ private long bytesSentTotal, bytesDecompressedTotal;
private int chunksCompressed, chunksUncompressed;
@@ -96,16 +100,17 @@ public class Uploader extends Transfer
if ( buffer == null || buffer.length < maxCompressedLength ) {
buffer = new byte[ maxCompressedLength ];
}
- uncompressed += decompressedLength;
+ bytesDecompressedTotal += decompressedLength;
int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength );
parentStream.writeInt( decompressedLength );
+ // Only send compressed data if we got down to at least ~88% the original size
if ( ( compressedLength * 9 / 8 ) < decompressedLength ) {
- compressed += compressedLength;
+ bytesSentTotal += compressedLength;
chunksCompressed++;
parentStream.writeInt( compressedLength );
parentStream.write( buffer, 0, compressedLength );
} else {
- compressed += decompressedLength;
+ bytesSentTotal += decompressedLength;
chunksUncompressed++;
parentStream.writeInt( decompressedLength );
parentStream.write( data, off, decompressedLength );
@@ -114,10 +119,10 @@ public class Uploader extends Transfer
public void printStats()
{
- if ( compressed == 0 )
+ if ( bytesSentTotal == 0 )
return;
- log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed );
- log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
+ log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal );
+ log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
index 15c86fb..ad2e96c 100644
--- a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
@@ -75,9 +75,13 @@ public abstract class OutgoingTransferBase extends AbstractTransfer
@Override
public void run()
{
- boolean ret = connection.upload( sourceFile.getAbsolutePath() );
- synchronized ( uploads ) {
- uploads.remove( connection );
+ boolean ret = false;
+ try {
+ ret = connection.upload( sourceFile.getAbsolutePath() );
+ } finally {
+ synchronized ( uploads ) {
+ uploads.remove( connection );
+ }
}
if ( ret ) {
connectFails.set( 0 );