diff options
author | Simon Rettberg | 2015-08-18 19:48:59 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-08-18 19:48:59 +0200 |
commit | df9ea600ce311bdf907678d0da94eea5b39d890f (patch) | |
tree | 5d822056590456bfa666747f9f9e0182cebdb5e3 /src/main/java/org/openslx/thrifthelper | |
parent | Fix getNewSatClient() (diff) | |
download | master-sync-shared-df9ea600ce311bdf907678d0da94eea5b39d890f.tar.gz master-sync-shared-df9ea600ce311bdf907678d0da94eea5b39d890f.tar.xz master-sync-shared-df9ea600ce311bdf907678d0da94eea5b39d890f.zip |
First working version with SSL support
Diffstat (limited to 'src/main/java/org/openslx/thrifthelper')
-rw-r--r-- | src/main/java/org/openslx/thrifthelper/ThriftHandler.java | 25 | ||||
-rw-r--r-- | src/main/java/org/openslx/thrifthelper/ThriftManager.java | 231 |
2 files changed, 113 insertions, 143 deletions
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java index c645dd8..01ed0b0 100644 --- a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java +++ b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java @@ -13,27 +13,29 @@ import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransportException; +import org.openslx.thrifthelper.ThriftManager.ErrorCallback; class ThriftHandler<T extends TServiceClient> implements InvocationHandler { private final static Logger LOGGER = Logger.getLogger( ThriftHandler.class ); - public interface EventCallback<T> + protected interface WantClientCallback<T> { public T getNewClient(); - - public boolean error( int failCount, String method, Throwable t ); } private final Deque<T> pool = new ArrayDeque<>(); - private final EventCallback<T> callback; + private final WantClientCallback<? extends T> callback; + + private final ErrorCallback errorCallback; private final Set<String> thriftMethods; - public ThriftHandler( final Class<T> clazz, EventCallback<T> cb ) + public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> cb, ErrorCallback errCb ) { + errorCallback = errCb; callback = cb; Set<String> tmpset = new HashSet<String>(); Method[] methods = clazz.getMethods(); @@ -85,14 +87,14 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler if ( cause != null && ! ( cause instanceof TTransportException ) ) { throw cause; } - freeClient(client); + freeClient( client ); client = null; if ( cause == null ) cause = e; } } // Call the error callback. As long as true is returned, keep retrying - if ( !callback.error( i, method.getName(), cause ) ) { + if ( !errorCallback.thriftError( i, method.getName(), cause ) ) { break; } } @@ -106,16 +108,17 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler } } - private void freeClient(T client) { + private void freeClient( T client ) + { try { client.getInputProtocol().getTransport().close(); - } catch (Exception e) { + } catch ( Exception e ) { } try { client.getOutputProtocol().getTransport().close(); - } catch (Exception e) { + } catch ( Exception e ) { } - + } private T getClient() diff --git a/src/main/java/org/openslx/thrifthelper/ThriftManager.java b/src/main/java/org/openslx/thrifthelper/ThriftManager.java index 4c08102..c276034 100644 --- a/src/main/java/org/openslx/thrifthelper/ThriftManager.java +++ b/src/main/java/org/openslx/thrifthelper/ThriftManager.java @@ -8,18 +8,18 @@ import java.net.Socket; import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; +import org.apache.thrift.TServiceClient; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.openslx.bwlp.thrift.iface.MasterServer; import org.openslx.bwlp.thrift.iface.SatelliteServer; -import org.openslx.thrifthelper.ThriftHandler.EventCallback; +import org.openslx.thrifthelper.ThriftHandler.WantClientCallback; import org.openslx.util.Util; -public class ThriftManager +public class ThriftManager<T> { private final static Logger LOGGER = Logger.getLogger( ThriftManager.class ); @@ -37,73 +37,23 @@ public class ThriftManager public boolean thriftError( int failCount, String method, Throwable t ); } - private static ErrorCallback masterErrorCallback = null; - - private static ErrorCallback satelliteErrorCallback = null; - - /** - * Private members for master connection information - */ - private static String MASTERSERVER_ADDRESS = null; - private static final int MASTERSERVER_PORT = 9090; - private static final int MASTERSERVER_TIMEOUT = 15000; + private final T client; - /** - * Private members for satellite connection information - */ - private static String SATELLITE_IP = null; - private static final int SATELLITE_PORT = 9090; - private static final int SATELLITE_TIMEOUT = 15000; + @SuppressWarnings( "unchecked" ) + private ThriftManager( Class<T> ifClazz, Class<? extends TServiceClient> clientClazz, + WantClientCallback<? extends TServiceClient> internalCallback, ErrorCallback errorCb ) + { + this.client = (T)Proxy.newProxyInstance( + ifClazz.getClassLoader(), + new Class[] { ifClazz }, new ThriftHandler<TServiceClient>( + clientClazz, internalCallback, errorCb ) ); + } - /** - * Sat connection. Initialized when we know the sat server IP. - */ - private static SatelliteServer.Iface satClient = (SatelliteServer.Iface)Proxy.newProxyInstance( - SatelliteServer.Iface.class.getClassLoader(), - new Class[] { SatelliteServer.Iface.class }, new ThriftHandler<SatelliteServer.Client>( - SatelliteServer.Client.class, new EventCallback<SatelliteServer.Client>() { - - @Override - public SatelliteServer.Client getNewClient() - { - // first check if we have a sat ip - if ( SATELLITE_IP == null ) { - LOGGER.error( "Satellite ip adress was not set prior to getting the sat client. Use setSatelliteAddress(<addr>)." ); - return null; - } - return getNewSatClient( SATELLITE_IP ); - } - - @Override - public boolean error( int failCount, String method, Throwable t ) - { - return satelliteErrorCallback != null && satelliteErrorCallback.thriftError( failCount, method, t ); - } - } ) ); + private static ThriftManager<MasterServer.Iface> masterManager = null; + private static ThriftManager<SatelliteServer.Iface> satelliteManager = null; - /** - * Master connection. As its address is known in advance, create the object right away. - */ - private static MasterServer.Iface masterClient = (MasterServer.Iface)Proxy.newProxyInstance( - MasterServer.Iface.class.getClassLoader(), - new Class[] { MasterServer.Iface.class }, new ThriftHandler<MasterServer.Client>( - MasterServer.Client.class, new EventCallback<MasterServer.Client>() { - - @Override - public MasterServer.Client getNewClient() - { - return getNewMasterClient(); - - } - - @Override - public boolean error( int failCount, String method, Throwable t ) - { - synchronized ( LOGGER ) { - return masterErrorCallback != null && masterErrorCallback.thriftError( failCount, method, t ); - } - } - } ) ); + private static ErrorCallback satErrorCallback = null; + private static ErrorCallback masterErrorCallback = null; /** * Sets the address of the master server @@ -111,9 +61,9 @@ public class ThriftManager * @param host the ip/hostname of the master server * @return true if setting the address worked, false otherwise */ - public static boolean setMasterServerAddress( String host ) + public static synchronized boolean setMasterServerAddress( final SSLContext ctx, final String host, final int port, final int timeout ) { - if ( MASTERSERVER_ADDRESS != null ) { + if ( masterManager != null ) { LOGGER.error( "Master server address already set." ); return false; } @@ -122,7 +72,20 @@ public class ThriftManager return false; } // finally set it - MASTERSERVER_ADDRESS = host; + masterManager = new ThriftManager<MasterServer.Iface>( MasterServer.Iface.class, MasterServer.Client.class, + new WantClientCallback<MasterServer.Client>() { + @Override + public MasterServer.Client getNewClient() + { + return getNewMasterClient( ctx, host, port, timeout ); + } + }, new ErrorCallback() { + @Override + public boolean thriftError( int failCount, String method, Throwable t ) + { + return masterErrorCallback != null && masterErrorCallback.thriftError( failCount, method, t ); + } + } ); return true; } @@ -132,18 +95,31 @@ public class ThriftManager * @param host the ip/hostname of the satellite * @return true if setting the address worked, false otherwise */ - public static boolean setSatelliteAddress( String host ) + public static synchronized boolean setSatelliteAddress( final SSLContext ctx, final String host, final int port, final int timeout ) { - if ( SATELLITE_IP != null ) { - LOGGER.error( "Satellite address already set." ); + if ( satelliteManager != null ) { + LOGGER.error( "Satellite server address already set." ); return false; } if ( host.isEmpty() ) { - LOGGER.error( "Given address for satellite is empty." ); + LOGGER.error( "Given address is empty." ); return false; } // finally set it - SATELLITE_IP = host; + satelliteManager = new ThriftManager<SatelliteServer.Iface>( SatelliteServer.Iface.class, SatelliteServer.Client.class, + new WantClientCallback<SatelliteServer.Client>() { + @Override + public SatelliteServer.Client getNewClient() + { + return getNewSatelliteClient( ctx, host, port, timeout ); + } + }, new ErrorCallback() { + @Override + public boolean thriftError( int failCount, String method, Throwable t ) + { + return satErrorCallback != null && satErrorCallback.thriftError( failCount, method, t ); + } + } ); return true; } @@ -154,7 +130,11 @@ public class ThriftManager */ public static SatelliteServer.Iface getSatClient() { - return satClient; + if ( satelliteManager == null ) { + LOGGER.error( "Satellite server adress was not set prior to getting the client. Use setMasterServerAddress(<addr>)." ); + return null; + } + return satelliteManager.client; } /** @@ -164,7 +144,11 @@ public class ThriftManager */ public static MasterServer.Iface getMasterClient() { - return masterClient; + if ( masterManager == null ) { + LOGGER.error( "Master server adress was not set prior to getting the client. Use setMasterServerAddress(<addr>)." ); + return null; + } + return masterManager.client; } /** @@ -173,11 +157,9 @@ public class ThriftManager * * @param cb */ - public static void setMasterErrorCallback( ErrorCallback cb ) + public static synchronized void setMasterErrorCallback( ErrorCallback cb ) { - synchronized ( LOGGER ) { - masterErrorCallback = cb; - } + masterErrorCallback = cb; } /** @@ -186,68 +168,53 @@ public class ThriftManager * * @param cb */ - public static void setSatelliteErrorCallback( ErrorCallback cb ) + public static synchronized void setSatelliteErrorCallback( ErrorCallback cb ) { - synchronized ( LOGGER ) { - satelliteErrorCallback = cb; - } + satErrorCallback = cb; } - public static SatelliteServer.Client getNewSatClient( String satelliteIp ) + public static MasterServer.Client getNewMasterClient( SSLContext ctx, String address, int port, int timeout ) { - TTransport transport = null; - try { - transport = newTransport( null, satelliteIp, SATELLITE_PORT, SATELLITE_TIMEOUT ); - } catch ( TTransportException e ) { - LOGGER.error( "Could not open transport to thrift's server with IP: " + satelliteIp ); + TProtocol protocol = newTransport( ctx, address, port, timeout ); + if ( protocol == null ) return null; - } - final TProtocol protocol = new TBinaryProtocol( transport ); - // now we are ready to create the client, according to ClientType! - LOGGER.info( "Satellite '" + satelliteIp + "' reachable. Client initialised." ); - return new SatelliteServer.Client( protocol ); + return new MasterServer.Client( protocol ); } - - public static MasterServer.Client getNewMasterClient() { - // first check if we have a sat ip - if ( MASTERSERVER_ADDRESS == null ) { - LOGGER.error( "Master server adress was not set prior to getting the client. Use setMasterServerAddress(<addr>)." ); - return null; - } - - TTransport transport; - try { - transport = newTransport( null, MASTERSERVER_ADDRESS, MASTERSERVER_PORT, MASTERSERVER_TIMEOUT ); - } catch ( TTransportException e ) { - LOGGER.error( "Could not open transport to thrift's server with IP: " + MASTERSERVER_ADDRESS ); + + public static SatelliteServer.Client getNewSatelliteClient( SSLContext ctx, String address, int port, int timeout ) + { + TProtocol protocol = newTransport( ctx, address, port, timeout ); + if ( protocol == null ) return null; - } - final TProtocol protocol = new TBinaryProtocol( - transport ); - // now we are ready to create the client, according to ClientType! - return new MasterServer.Client( protocol ); + return new SatelliteServer.Client( protocol ); } - - private static TTransport newTransport( SSLContext ctx, String host, int port, int timeout ) throws TTransportException { - TSocket tsock; - if (ctx == null) { - tsock = new TSocket( host, port, timeout ); - tsock.open(); - } else { - Socket socket = null; - try { - socket = ctx.getSocketFactory().createSocket(); - socket.setSoTimeout(timeout); - socket.connect( new InetSocketAddress( host, port ), timeout ); - } catch (IOException e) { - if ( socket != null ) { - Util.safeClose( socket ); + + private static TProtocol newTransport( SSLContext ctx, String host, int port, int timeout ) + { + try { + TSocket tsock; + if ( ctx == null ) { + tsock = new TSocket( host, port, timeout ); + tsock.open(); + } else { + Socket socket = null; + try { + socket = ctx.getSocketFactory().createSocket(); + socket.setSoTimeout( timeout ); + socket.connect( new InetSocketAddress( host, port ), timeout ); + } catch ( IOException e ) { + if ( socket != null ) { + Util.safeClose( socket ); + } + throw new TTransportException(); } - throw new TTransportException(); + tsock = new TSocket( socket ); } - tsock = new TSocket( socket ); + return new TBinaryProtocol( new TFramedTransport( tsock ) ); + } catch ( TTransportException e ) { + LOGGER.error( "Could not open transport to thrift server at " + host + ":" + port ); + return null; } - return new TFramedTransport( tsock ); } } |