package org.openslx.thrifthelper; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; import org.apache.thrift.protocol.TProtocolException; import org.apache.thrift.transport.TTransportException; import org.openslx.thrifthelper.ThriftManager.ErrorCallback; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; class ThriftHandler implements InvocationHandler { private final static Logger LOGGER = LogManager.getLogger( ThriftHandler.class ); /** * How long a client/connection should be allowed to be idle before we get rid of it and open a fresh one */ private final static long MAX_IDLE_MS = 110_000; protected interface WantClientCallback { public T getNewClient(); } /** Pool of (potentially) good connections, waiting for reuse */ private final Deque> pool = new ArrayDeque<>(); /** Old connections to be released by cleanup task */ private Deque> trash = new ArrayDeque<>(); private final WantClientCallback clientFactory; private final ErrorCallback errorCallback; private final Set thriftMethods; public ThriftHandler( final Class clazz, WantClientCallback clientFactory, ErrorCallback errCb ) { this.errorCallback = errCb; this.clientFactory = clientFactory; Set tmpset = new HashSet(); Method[] methods = clazz.getMethods(); // Iterate over all methods of this class for ( int i = 0; i < methods.length; i++ ) { boolean thrift = false; // If a method throws some form of TException, consider it a potential method from the thrift interface Class[] type = methods[i].getExceptionTypes(); for ( int e = 0; e < type.length; e++ ) { if ( TException.class.isAssignableFrom( type[e] ) ) { thrift = true; break; } } String name = methods[i].getName(); if ( thrift && !name.startsWith( "send_" ) && !name.startsWith( "recv_" ) ) { // Exclude the send_ and recv_ helpers tmpset.add( name ); } } thriftMethods = Collections.unmodifiableSet( tmpset ); // Periodically scan for old connections, in case the application idles for extended periods of time... QuickTimer.scheduleAtFixedDelay( new Task() { @Override public void fire() { Deque> newTrash = new ArrayDeque<>(); Deque> list; long now = System.currentTimeMillis(); synchronized ( pool ) { list = trash; trash = newTrash; for ( Iterator> it = pool.iterator(); it.hasNext(); ) { TWrap client = it.next(); if ( client.deadline < now ) { list.add( client ); it.remove(); } } } for ( TWrap client : list ) { freeClient( client ); } } }, MAX_IDLE_MS * 5, MAX_IDLE_MS * 5 ); } @Override public Object invoke( Object tproxy, Method method, Object[] args ) throws Throwable { // first find the thrift methods if ( !thriftMethods.contains( method.getName() ) ) { throw new IllegalAccessException( "Cannot call this method on a proxied thrift client" ); } TWrap clientWrap = getClient(); try { Throwable cause = null; for ( int i = 1; clientWrap != null; i++ ) { try { return method.invoke( clientWrap.client, args ); } catch ( InvocationTargetException e ) { cause = e.getCause(); // Get original exception if ( cause != null && ! ( cause instanceof TTransportException ) && ! ( cause instanceof TProtocolException ) ) { // If it's not an exception potentially hinting at dead connection, just pass it on throw cause; } // Get rid of broken connection freeClient( clientWrap ); clientWrap = null; if ( cause == null ) { cause = e; } } // Call the error callback. As long as true is returned, keep retrying if ( !errorCallback.thriftError( i, method.getName(), cause ) ) { break; } // Apparently we should retry, get another client if ( clientWrap == null ) { clientWrap = getClient(); cause = null; } } // Uh oh if ( cause != null ) throw cause; throw new TTransportException( "Could not connect" ); } finally { returnClient( clientWrap ); } } private void freeClient( TWrap client ) { try { client.client.getInputProtocol().getTransport().close(); } catch ( Exception e ) { } try { client.client.getOutputProtocol().getTransport().close(); } catch ( Exception e ) { } } /** * Get an available client connection. Prefer connection from pool, * honoring the max idle time. If none are available, create a fresh * client connection. */ private TWrap getClient() { long now = System.currentTimeMillis(); synchronized ( pool ) { TWrap client; while ( ( client = pool.poll() ) != null ) { if ( client.deadline >= now ) return client; // Still fresh trash.add( client ); } } // No usable existing connection, create new client LOGGER.debug( "Creating new thrift client" ); return new TWrap( clientFactory.getNewClient() ); } /** * Return a client connection to the pool, updating its last * use timestamp for proper idle timeout handling. */ private void returnClient( TWrap client ) { if ( client == null ) return; client.deadline = System.currentTimeMillis() + MAX_IDLE_MS; synchronized ( pool ) { pool.push( client ); } } private static class TWrap { private final T client; private long deadline; public TWrap(T client) { this.client = client; } } }