summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2023-05-23 11:53:57 +0200
committerSimon Rettberg2023-05-23 11:54:50 +0200
commiteecd1577ef091007929c2b1e4b47494b1d1088d8 (patch)
tree1d268e481cd9e6040fe75fa2f7dba2914d618ff5
parentFix 'content is not allowed in prolog' (diff)
downloadmaster-sync-shared-eecd1577ef091007929c2b1e4b47494b1d1088d8.tar.gz
master-sync-shared-eecd1577ef091007929c2b1e4b47494b1d1088d8.tar.xz
master-sync-shared-eecd1577ef091007929c2b1e4b47494b1d1088d8.zip
ThiriftHelper: Discard connections from pool after idling for too long
This is to avoid problems with firewalls/NATs that discard state after a while and don't send RST packets back to the client.
-rw-r--r--src/main/java/org/openslx/thrifthelper/ThriftHandler.java135
1 files changed, 99 insertions, 36 deletions
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
index e53eff9..e73c0ec 100644
--- a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
+++ b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
@@ -7,6 +7,7 @@ import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
@@ -16,44 +17,83 @@ import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.transport.TTransportException;
import org.openslx.thrifthelper.ThriftManager.ErrorCallback;
+import org.openslx.util.QuickTimer;
+import org.openslx.util.QuickTimer.Task;
class ThriftHandler<T extends TServiceClient> implements InvocationHandler
{
private final static Logger LOGGER = LogManager.getLogger( ThriftHandler.class );
+
+ /**
+ * How long a client/connection should be allowed to be idle before we get rid of it and open a fresh one
+ */
+ private final static long MAX_IDLE_MS = 110_000;
protected interface WantClientCallback<T>
{
public T getNewClient();
}
- private final Deque<T> pool = new ArrayDeque<>();
+ /** Pool of (potentially) good connections, waiting for reuse */
+ private final Deque<TWrap<T>> pool = new ArrayDeque<>();
+ /** Old connections to be released by cleanup task */
+ private Deque<TWrap<T>> trash = new ArrayDeque<>();
- private final WantClientCallback<? extends T> callback;
+ private final WantClientCallback<? extends T> clientFactory;
private final ErrorCallback errorCallback;
private final Set<String> thriftMethods;
- public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> cb, ErrorCallback errCb )
+ public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> clientFactory, ErrorCallback errCb )
{
- errorCallback = errCb;
- callback = cb;
+ this.errorCallback = errCb;
+ this.clientFactory = clientFactory;
Set<String> tmpset = new HashSet<String>();
Method[] methods = clazz.getMethods();
+ // Iterate over all methods of this class
for ( int i = 0; i < methods.length; i++ ) {
boolean thrift = false;
+ // If a method throws some form of TException, consider it a potential method from the thrift interface
Class<?>[] type = methods[i].getExceptionTypes();
for ( int e = 0; e < type.length; e++ ) {
- if ( TException.class.isAssignableFrom( type[e] ) )
+ if ( TException.class.isAssignableFrom( type[e] ) ) {
thrift = true;
+ break;
+ }
}
String name = methods[i].getName();
if ( thrift && !name.startsWith( "send_" ) && !name.startsWith( "recv_" ) ) {
+ // Exclude the send_ and recv_ helpers
tmpset.add( name );
}
}
thriftMethods = Collections.unmodifiableSet( tmpset );
+ // Periodically scan for old connections, in case the application idles for extended periods of time...
+ QuickTimer.scheduleAtFixedDelay( new Task() {
+ @Override
+ public void fire()
+ {
+ Deque<TWrap<T>> newTrash = new ArrayDeque<>();
+ Deque<TWrap<T>> list;
+ long now = System.currentTimeMillis();
+ synchronized ( pool ) {
+ list = trash;
+ trash = newTrash;
+ for ( Iterator<TWrap<T>> it = pool.iterator(); it.hasNext(); ) {
+ TWrap<T> client = it.next();
+ if ( client.deadline < now ) {
+ list.add( client );
+ it.remove();
+ }
+ }
+ }
+ for ( TWrap<T> client : list ) {
+ freeClient( client );
+ }
+ }
+ }, MAX_IDLE_MS * 5, MAX_IDLE_MS * 5 );
}
@Override
@@ -65,32 +105,33 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler
throw new IllegalAccessException( "Cannot call this method on a proxied thrift client" );
}
- T client = getClient();
+ TWrap<T> clientWrap = getClient();
try {
Throwable cause = null;
- for ( int i = 1;; i++ ) {
- if ( client != null ) {
- try {
- return method.invoke( client, args );
- } catch ( InvocationTargetException e ) {
- cause = e.getCause();
- if ( cause != null && ! ( cause instanceof TTransportException )
- && ! ( cause instanceof TProtocolException ) ) {
- throw cause;
- }
- freeClient( client );
- client = null;
- if ( cause == null ) {
- cause = e;
- }
+ for ( int i = 1; clientWrap != null; i++ ) {
+ try {
+ return method.invoke( clientWrap.client, args );
+ } catch ( InvocationTargetException e ) {
+ cause = e.getCause(); // Get original exception
+ if ( cause != null && ! ( cause instanceof TTransportException )
+ && ! ( cause instanceof TProtocolException ) ) {
+ // If it's not an exception potentially hinting at dead connection, just pass it on
+ throw cause;
+ }
+ // Get rid of broken connection
+ freeClient( clientWrap );
+ clientWrap = null;
+ if ( cause == null ) {
+ cause = e;
}
}
// Call the error callback. As long as true is returned, keep retrying
if ( !errorCallback.thriftError( i, method.getName(), cause ) ) {
break;
}
- if ( client == null ) {
- client = getClient();
+ // Apparently we should retry, get another client
+ if ( clientWrap == null ) {
+ clientWrap = getClient();
cause = null;
}
}
@@ -100,44 +141,66 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler
throw cause;
throw new TTransportException( "Could not connect" );
} finally {
- returnClient( client );
+ returnClient( clientWrap );
}
}
- private void freeClient( T client )
+ private void freeClient( TWrap<T> client )
{
try {
- client.getInputProtocol().getTransport().close();
+ client.client.getInputProtocol().getTransport().close();
} catch ( Exception e ) {
}
try {
- client.getOutputProtocol().getTransport().close();
+ client.client.getOutputProtocol().getTransport().close();
} catch ( Exception e ) {
}
}
- private T getClient()
+ /**
+ * Get an available client connection. Prefer connection from pool,
+ * honoring the max idle time. If none are available, create a fresh
+ * client connection.
+ */
+ private TWrap<T> getClient()
{
- T client;
+ long now = System.currentTimeMillis();
synchronized ( pool ) {
- client = pool.poll();
- if ( client != null ) {
- return client;
+ TWrap<T> client;
+ while ( ( client = pool.poll() ) != null ) {
+ if ( client.deadline >= now )
+ return client; // Still fresh
+ trash.add( client );
}
}
- // Pool is closed, create new client
+ // No usable existing connection, create new client
LOGGER.debug( "Creating new thrift client" );
- return callback.getNewClient();
+ return new TWrap<T>( clientFactory.getNewClient() );
}
- private void returnClient( T client )
+ /**
+ * Return a client connection to the pool, updating its last
+ * use timestamp for proper idle timeout handling.
+ */
+ private void returnClient( TWrap<T> client )
{
if ( client == null )
return;
+ client.deadline = System.currentTimeMillis() + MAX_IDLE_MS;
synchronized ( pool ) {
pool.push( client );
}
}
+
+ private static class TWrap<T>
+ {
+ private final T client;
+ private long deadline;
+ public TWrap(T client)
+ {
+ this.client = client;
+ }
+ }
}