diff options
author | Simon Rettberg | 2023-05-23 11:53:57 +0200 |
---|---|---|
committer | Simon Rettberg | 2023-05-23 11:54:50 +0200 |
commit | eecd1577ef091007929c2b1e4b47494b1d1088d8 (patch) | |
tree | 1d268e481cd9e6040fe75fa2f7dba2914d618ff5 | |
parent | Fix 'content is not allowed in prolog' (diff) | |
download | master-sync-shared-eecd1577ef091007929c2b1e4b47494b1d1088d8.tar.gz master-sync-shared-eecd1577ef091007929c2b1e4b47494b1d1088d8.tar.xz master-sync-shared-eecd1577ef091007929c2b1e4b47494b1d1088d8.zip |
ThiriftHelper: Discard connections from pool after idling for too long
This is to avoid problems with firewalls/NATs that discard state after
a while and don't send RST packets back to the client.
-rw-r--r-- | src/main/java/org/openslx/thrifthelper/ThriftHandler.java | 135 |
1 files changed, 99 insertions, 36 deletions
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java index e53eff9..e73c0ec 100644 --- a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java +++ b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java @@ -7,6 +7,7 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import org.apache.logging.log4j.LogManager; @@ -16,44 +17,83 @@ import org.apache.thrift.TServiceClient; import org.apache.thrift.protocol.TProtocolException; import org.apache.thrift.transport.TTransportException; import org.openslx.thrifthelper.ThriftManager.ErrorCallback; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; class ThriftHandler<T extends TServiceClient> implements InvocationHandler { private final static Logger LOGGER = LogManager.getLogger( ThriftHandler.class ); + + /** + * How long a client/connection should be allowed to be idle before we get rid of it and open a fresh one + */ + private final static long MAX_IDLE_MS = 110_000; protected interface WantClientCallback<T> { public T getNewClient(); } - private final Deque<T> pool = new ArrayDeque<>(); + /** Pool of (potentially) good connections, waiting for reuse */ + private final Deque<TWrap<T>> pool = new ArrayDeque<>(); + /** Old connections to be released by cleanup task */ + private Deque<TWrap<T>> trash = new ArrayDeque<>(); - private final WantClientCallback<? extends T> callback; + private final WantClientCallback<? extends T> clientFactory; private final ErrorCallback errorCallback; private final Set<String> thriftMethods; - public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> cb, ErrorCallback errCb ) + public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> clientFactory, ErrorCallback errCb ) { - errorCallback = errCb; - callback = cb; + this.errorCallback = errCb; + this.clientFactory = clientFactory; Set<String> tmpset = new HashSet<String>(); Method[] methods = clazz.getMethods(); + // Iterate over all methods of this class for ( int i = 0; i < methods.length; i++ ) { boolean thrift = false; + // If a method throws some form of TException, consider it a potential method from the thrift interface Class<?>[] type = methods[i].getExceptionTypes(); for ( int e = 0; e < type.length; e++ ) { - if ( TException.class.isAssignableFrom( type[e] ) ) + if ( TException.class.isAssignableFrom( type[e] ) ) { thrift = true; + break; + } } String name = methods[i].getName(); if ( thrift && !name.startsWith( "send_" ) && !name.startsWith( "recv_" ) ) { + // Exclude the send_ and recv_ helpers tmpset.add( name ); } } thriftMethods = Collections.unmodifiableSet( tmpset ); + // Periodically scan for old connections, in case the application idles for extended periods of time... + QuickTimer.scheduleAtFixedDelay( new Task() { + @Override + public void fire() + { + Deque<TWrap<T>> newTrash = new ArrayDeque<>(); + Deque<TWrap<T>> list; + long now = System.currentTimeMillis(); + synchronized ( pool ) { + list = trash; + trash = newTrash; + for ( Iterator<TWrap<T>> it = pool.iterator(); it.hasNext(); ) { + TWrap<T> client = it.next(); + if ( client.deadline < now ) { + list.add( client ); + it.remove(); + } + } + } + for ( TWrap<T> client : list ) { + freeClient( client ); + } + } + }, MAX_IDLE_MS * 5, MAX_IDLE_MS * 5 ); } @Override @@ -65,32 +105,33 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler throw new IllegalAccessException( "Cannot call this method on a proxied thrift client" ); } - T client = getClient(); + TWrap<T> clientWrap = getClient(); try { Throwable cause = null; - for ( int i = 1;; i++ ) { - if ( client != null ) { - try { - return method.invoke( client, args ); - } catch ( InvocationTargetException e ) { - cause = e.getCause(); - if ( cause != null && ! ( cause instanceof TTransportException ) - && ! ( cause instanceof TProtocolException ) ) { - throw cause; - } - freeClient( client ); - client = null; - if ( cause == null ) { - cause = e; - } + for ( int i = 1; clientWrap != null; i++ ) { + try { + return method.invoke( clientWrap.client, args ); + } catch ( InvocationTargetException e ) { + cause = e.getCause(); // Get original exception + if ( cause != null && ! ( cause instanceof TTransportException ) + && ! ( cause instanceof TProtocolException ) ) { + // If it's not an exception potentially hinting at dead connection, just pass it on + throw cause; + } + // Get rid of broken connection + freeClient( clientWrap ); + clientWrap = null; + if ( cause == null ) { + cause = e; } } // Call the error callback. As long as true is returned, keep retrying if ( !errorCallback.thriftError( i, method.getName(), cause ) ) { break; } - if ( client == null ) { - client = getClient(); + // Apparently we should retry, get another client + if ( clientWrap == null ) { + clientWrap = getClient(); cause = null; } } @@ -100,44 +141,66 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler throw cause; throw new TTransportException( "Could not connect" ); } finally { - returnClient( client ); + returnClient( clientWrap ); } } - private void freeClient( T client ) + private void freeClient( TWrap<T> client ) { try { - client.getInputProtocol().getTransport().close(); + client.client.getInputProtocol().getTransport().close(); } catch ( Exception e ) { } try { - client.getOutputProtocol().getTransport().close(); + client.client.getOutputProtocol().getTransport().close(); } catch ( Exception e ) { } } - private T getClient() + /** + * Get an available client connection. Prefer connection from pool, + * honoring the max idle time. If none are available, create a fresh + * client connection. + */ + private TWrap<T> getClient() { - T client; + long now = System.currentTimeMillis(); synchronized ( pool ) { - client = pool.poll(); - if ( client != null ) { - return client; + TWrap<T> client; + while ( ( client = pool.poll() ) != null ) { + if ( client.deadline >= now ) + return client; // Still fresh + trash.add( client ); } } - // Pool is closed, create new client + // No usable existing connection, create new client LOGGER.debug( "Creating new thrift client" ); - return callback.getNewClient(); + return new TWrap<T>( clientFactory.getNewClient() ); } - private void returnClient( T client ) + /** + * Return a client connection to the pool, updating its last + * use timestamp for proper idle timeout handling. + */ + private void returnClient( TWrap<T> client ) { if ( client == null ) return; + client.deadline = System.currentTimeMillis() + MAX_IDLE_MS; synchronized ( pool ) { pool.push( client ); } } + + private static class TWrap<T> + { + private final T client; + private long deadline; + public TWrap(T client) + { + this.client = client; + } + } } |