diff options
author | Simon Rettberg | 2015-08-12 13:06:32 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-08-12 13:06:32 +0200 |
commit | 3403fd0c1a157a80ad5a2ec73cbbc46b874347db (patch) | |
tree | 1d08f6bd697eb91bb6f4e6964afc067f6486335f /src | |
parent | whoami() returns WhoamiInfo (diff) | |
download | master-sync-shared-3403fd0c1a157a80ad5a2ec73cbbc46b874347db.tar.gz master-sync-shared-3403fd0c1a157a80ad5a2ec73cbbc46b874347db.tar.xz master-sync-shared-3403fd0c1a157a80ad5a2ec73cbbc46b874347db.zip |
Use connection pool in thrift manager, allow getting explicit sat connection
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/org/openslx/thrifthelper/ThriftHandler.java | 87 | ||||
-rw-r--r-- | src/main/java/org/openslx/thrifthelper/ThriftManager.java | 41 |
2 files changed, 75 insertions, 53 deletions
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java index 121d34b..84cd94d 100644 --- a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java +++ b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java @@ -3,7 +3,9 @@ package org.openslx.thrifthelper; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayDeque; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; import java.util.Set; @@ -23,7 +25,7 @@ class ThriftHandler<T extends Object> implements InvocationHandler public boolean error( int failCount, String method, Throwable t ); } - private final ThreadLocal<T> clients = new ThreadLocal<T>(); + private final Deque<T> pool = new ArrayDeque<>(); private final EventCallback<T> callback; public ThriftHandler( final Class<T> clazz, EventCallback<T> cb ) @@ -59,7 +61,7 @@ class ThriftHandler<T extends Object> implements InvocationHandler // first find the thrift methods if ( !thriftMethods.contains( method.getName() ) ) { try { - return method.invoke( getClient( false ), args ); + return method.invoke( getClient(), args ); } catch ( InvocationTargetException e ) { Throwable cause = e.getCause(); if ( cause == null ) { @@ -69,45 +71,62 @@ class ThriftHandler<T extends Object> implements InvocationHandler } } - T client = getClient( false ); - Throwable cause = null; - for ( int i = 1;; i++ ) { - if ( client == null ) { - LOGGER.debug( "Transport error - re-initialising ..." ); - client = getClient( true ); - } - if ( client != null ) { - try { - return method.invoke( client, args ); - } catch ( InvocationTargetException e ) { - cause = e.getCause(); - if ( cause != null && ! ( cause instanceof TTransportException ) ) - throw cause; - client = null; - if ( cause == null ) - cause = e; + T client = getClient(); + try { + Throwable cause = null; + for ( int i = 1;; i++ ) { + if ( client == null ) { + LOGGER.debug( "Transport error - re-initialising ..." ); + client = getClient(); + } + if ( client != null ) { + try { + return method.invoke( client, args ); + } catch ( InvocationTargetException e ) { + cause = e.getCause(); + if ( cause != null && ! ( cause instanceof TTransportException ) ) { + throw cause; + } + client = null; + if ( cause == null ) + cause = e; + } + } + // Call the error callback. As long as true is returned, keep retrying + if ( !callback.error( i, method.getName(), cause ) ) { + break; } } - // Call the error callback. As long as true is returned, keep retrying - if ( !callback.error( i, method.getName(), cause ) ) { - break; - } + + // Uh oh + if ( cause != null ) + throw cause; + throw new TTransportException( "Could not connect" ); + } finally { + returnClient( client ); } + } - // Uh oh - if ( cause != null ) - throw cause; - throw new TTransportException( "Could not connect" ); + private T getClient() + { + T client; + synchronized ( pool ) { + client = pool.poll(); + if ( client != null ) + return client; + } + // Pool is closed, create new client + LOGGER.debug( "Creating new thrift client" ); + return callback.getNewClient(); } - private T getClient( boolean forceNew ) + private void returnClient( T client ) { - T client = clients.get(); - if ( client != null && !forceNew ) { - return client; + if ( client == null ) + return; + synchronized ( pool ) { + pool.push( client ); } - client = callback.getNewClient(); - clients.set( client ); - return client; } + } diff --git a/src/main/java/org/openslx/thrifthelper/ThriftManager.java b/src/main/java/org/openslx/thrifthelper/ThriftManager.java index 4ca6a0b..b132a9e 100644 --- a/src/main/java/org/openslx/thrifthelper/ThriftManager.java +++ b/src/main/java/org/openslx/thrifthelper/ThriftManager.java @@ -11,6 +11,7 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.openslx.bwlp.thrift.iface.MasterServer; import org.openslx.bwlp.thrift.iface.SatelliteServer; +import org.openslx.bwlp.thrift.iface.SatelliteServer.Client; import org.openslx.thrifthelper.ThriftHandler.EventCallback; public class ThriftManager @@ -63,23 +64,7 @@ public class ThriftManager LOGGER.error( "Satellite ip adress was not set prior to getting the sat client. Use setSatelliteAddress(<addr>)." ); return null; } - // ok lets do it - TTransport transport = new TFramedTransport( - new TSocket( - SATELLITE_IP, SATELLITE_PORT, SATELLITE_TIMEOUT ) ); - try { - transport.open(); - } catch ( TTransportException e ) { - LOGGER.error( "Could not open transport to thrift's server with IP: " + SATELLITE_IP ); - transport.close(); - return null; - } - final TProtocol protocol = new TBinaryProtocol( - transport ); - // now we are ready to create the client, according to ClientType! - LOGGER.info( "Satellite '" + SATELLITE_IP + "' reachable. Client initialised." ); - return new SatelliteServer.Client( - protocol ); + return getNewSatClient( SATELLITE_IP ); } @Override @@ -120,8 +105,7 @@ public class ThriftManager final TProtocol protocol = new TBinaryProtocol( transport ); // now we are ready to create the client, according to ClientType! - return new MasterServer.Client( - protocol ); + return new MasterServer.Client( protocol ); } @@ -208,4 +192,23 @@ public class ThriftManager errorCallback = cb; } } + + public static Client getNewSatClient( String satelliteIp ) + { + // ok lets do it + TTransport transport = new TFramedTransport( + new TSocket( satelliteIp, SATELLITE_PORT, SATELLITE_TIMEOUT ) ); + try { + transport.open(); + } catch ( TTransportException e ) { + LOGGER.error( "Could not open transport to thrift's server with IP: " + SATELLITE_IP ); + transport.close(); + return null; + } + final TProtocol protocol = new TBinaryProtocol( transport ); + // now we are ready to create the client, according to ClientType! + LOGGER.info( "Satellite '" + satelliteIp + "' reachable. Client initialised." ); + return new SatelliteServer.Client( protocol ); + } + } |