summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSimon Rettberg2015-08-12 13:06:32 +0200
committerSimon Rettberg2015-08-12 13:06:32 +0200
commit3403fd0c1a157a80ad5a2ec73cbbc46b874347db (patch)
tree1d08f6bd697eb91bb6f4e6964afc067f6486335f /src
parentwhoami() returns WhoamiInfo (diff)
downloadmaster-sync-shared-3403fd0c1a157a80ad5a2ec73cbbc46b874347db.tar.gz
master-sync-shared-3403fd0c1a157a80ad5a2ec73cbbc46b874347db.tar.xz
master-sync-shared-3403fd0c1a157a80ad5a2ec73cbbc46b874347db.zip
Use connection pool in thrift manager, allow getting explicit sat connection
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/openslx/thrifthelper/ThriftHandler.java87
-rw-r--r--src/main/java/org/openslx/thrifthelper/ThriftManager.java41
2 files changed, 75 insertions, 53 deletions
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
index 121d34b..84cd94d 100644
--- a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
+++ b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
@@ -3,7 +3,9 @@ package org.openslx.thrifthelper;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.ArrayDeque;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashSet;
import java.util.Set;
@@ -23,7 +25,7 @@ class ThriftHandler<T extends Object> implements InvocationHandler
public boolean error( int failCount, String method, Throwable t );
}
- private final ThreadLocal<T> clients = new ThreadLocal<T>();
+ private final Deque<T> pool = new ArrayDeque<>();
private final EventCallback<T> callback;
public ThriftHandler( final Class<T> clazz, EventCallback<T> cb )
@@ -59,7 +61,7 @@ class ThriftHandler<T extends Object> implements InvocationHandler
// first find the thrift methods
if ( !thriftMethods.contains( method.getName() ) ) {
try {
- return method.invoke( getClient( false ), args );
+ return method.invoke( getClient(), args );
} catch ( InvocationTargetException e ) {
Throwable cause = e.getCause();
if ( cause == null ) {
@@ -69,45 +71,62 @@ class ThriftHandler<T extends Object> implements InvocationHandler
}
}
- T client = getClient( false );
- Throwable cause = null;
- for ( int i = 1;; i++ ) {
- if ( client == null ) {
- LOGGER.debug( "Transport error - re-initialising ..." );
- client = getClient( true );
- }
- if ( client != null ) {
- try {
- return method.invoke( client, args );
- } catch ( InvocationTargetException e ) {
- cause = e.getCause();
- if ( cause != null && ! ( cause instanceof TTransportException ) )
- throw cause;
- client = null;
- if ( cause == null )
- cause = e;
+ T client = getClient();
+ try {
+ Throwable cause = null;
+ for ( int i = 1;; i++ ) {
+ if ( client == null ) {
+ LOGGER.debug( "Transport error - re-initialising ..." );
+ client = getClient();
+ }
+ if ( client != null ) {
+ try {
+ return method.invoke( client, args );
+ } catch ( InvocationTargetException e ) {
+ cause = e.getCause();
+ if ( cause != null && ! ( cause instanceof TTransportException ) ) {
+ throw cause;
+ }
+ client = null;
+ if ( cause == null )
+ cause = e;
+ }
+ }
+ // Call the error callback. As long as true is returned, keep retrying
+ if ( !callback.error( i, method.getName(), cause ) ) {
+ break;
}
}
- // Call the error callback. As long as true is returned, keep retrying
- if ( !callback.error( i, method.getName(), cause ) ) {
- break;
- }
+
+ // Uh oh
+ if ( cause != null )
+ throw cause;
+ throw new TTransportException( "Could not connect" );
+ } finally {
+ returnClient( client );
}
+ }
- // Uh oh
- if ( cause != null )
- throw cause;
- throw new TTransportException( "Could not connect" );
+ private T getClient()
+ {
+ T client;
+ synchronized ( pool ) {
+ client = pool.poll();
+ if ( client != null )
+ return client;
+ }
+ // Pool is closed, create new client
+ LOGGER.debug( "Creating new thrift client" );
+ return callback.getNewClient();
}
- private T getClient( boolean forceNew )
+ private void returnClient( T client )
{
- T client = clients.get();
- if ( client != null && !forceNew ) {
- return client;
+ if ( client == null )
+ return;
+ synchronized ( pool ) {
+ pool.push( client );
}
- client = callback.getNewClient();
- clients.set( client );
- return client;
}
+
}
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftManager.java b/src/main/java/org/openslx/thrifthelper/ThriftManager.java
index 4ca6a0b..b132a9e 100644
--- a/src/main/java/org/openslx/thrifthelper/ThriftManager.java
+++ b/src/main/java/org/openslx/thrifthelper/ThriftManager.java
@@ -11,6 +11,7 @@ import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.openslx.bwlp.thrift.iface.MasterServer;
import org.openslx.bwlp.thrift.iface.SatelliteServer;
+import org.openslx.bwlp.thrift.iface.SatelliteServer.Client;
import org.openslx.thrifthelper.ThriftHandler.EventCallback;
public class ThriftManager
@@ -63,23 +64,7 @@ public class ThriftManager
LOGGER.error( "Satellite ip adress was not set prior to getting the sat client. Use setSatelliteAddress(<addr>)." );
return null;
}
- // ok lets do it
- TTransport transport = new TFramedTransport(
- new TSocket(
- SATELLITE_IP, SATELLITE_PORT, SATELLITE_TIMEOUT ) );
- try {
- transport.open();
- } catch ( TTransportException e ) {
- LOGGER.error( "Could not open transport to thrift's server with IP: " + SATELLITE_IP );
- transport.close();
- return null;
- }
- final TProtocol protocol = new TBinaryProtocol(
- transport );
- // now we are ready to create the client, according to ClientType!
- LOGGER.info( "Satellite '" + SATELLITE_IP + "' reachable. Client initialised." );
- return new SatelliteServer.Client(
- protocol );
+ return getNewSatClient( SATELLITE_IP );
}
@Override
@@ -120,8 +105,7 @@ public class ThriftManager
final TProtocol protocol = new TBinaryProtocol(
transport );
// now we are ready to create the client, according to ClientType!
- return new MasterServer.Client(
- protocol );
+ return new MasterServer.Client( protocol );
}
@@ -208,4 +192,23 @@ public class ThriftManager
errorCallback = cb;
}
}
+
+ public static Client getNewSatClient( String satelliteIp )
+ {
+ // ok lets do it
+ TTransport transport = new TFramedTransport(
+ new TSocket( satelliteIp, SATELLITE_PORT, SATELLITE_TIMEOUT ) );
+ try {
+ transport.open();
+ } catch ( TTransportException e ) {
+ LOGGER.error( "Could not open transport to thrift's server with IP: " + SATELLITE_IP );
+ transport.close();
+ return null;
+ }
+ final TProtocol protocol = new TBinaryProtocol( transport );
+ // now we are ready to create the client, according to ClientType!
+ LOGGER.info( "Satellite '" + satelliteIp + "' reachable. Client initialised." );
+ return new SatelliteServer.Client( protocol );
+ }
+
}