diff options
Diffstat (limited to 'src/main/java/org/openslx/thrifthelper')
5 files changed, 210 insertions, 61 deletions
diff --git a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java index 5cf0a10..5c5ef01 100644 --- a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java +++ b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java @@ -2,9 +2,13 @@ package org.openslx.thrifthelper; import org.openslx.bwlp.thrift.iface.ImagePublishData; -@SuppressWarnings( "serial" ) public class ImagePublishDataEx extends ImagePublishData { + /** + * Version for serialization. + */ + private static final long serialVersionUID = 314945044011262005L; + public String exImagePath; public boolean exIsValid; } diff --git a/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java b/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java index 86a2306..ca2bb2c 100644 --- a/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java +++ b/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java @@ -1,15 +1,26 @@ package org.openslx.thrifthelper; -import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import javax.net.ssl.SSLException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TMessage; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolException; import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.layered.TLayeredTransport; /** * Binary protocol implementation for thrift. @@ -18,13 +29,20 @@ import org.apache.thrift.transport.TTransport; */ public class TBinaryProtocolSafe extends TBinaryProtocol { + + private final static Logger LOGGER = LogManager.getLogger( ThriftHandler.class ); + /** * Factory */ - @SuppressWarnings( "serial" ) public static class Factory implements TProtocolFactory { + /** + * Version for serialization. + */ + private static final long serialVersionUID = 6896537370338823740L; + protected boolean strictRead_ = false; protected boolean strictWrite_ = true; @@ -57,24 +75,53 @@ public class TBinaryProtocolSafe extends TBinaryProtocol public TBinaryProtocolSafe(TTransport trans, boolean strictRead, boolean strictWrite) { - super( trans ); - strictRead_ = strictRead; - strictWrite_ = strictWrite; + super( trans, maxLen, maxLen, strictRead, strictWrite ); } - /** + /* * Reading methods. */ + @Override public TMessage readMessageBegin() throws TException { - int size = readI32(); + int size; + try { + size = readI32(); + } catch ( TTransportException e ) { + // Do this to suppress certain SSL handshake errors that result from port scanning and service probing + if ( e.getCause() instanceof SSLException ) { + String m = e.getCause().getMessage(); + // We still want SSL errors that help diagnosing more specific SSL errors that relate to actual + // SSL handshake attempts, like incompatible TLS versions or ciphers. + if ( !m.contains( "Remote host terminated the handshake" ) + && !m.contains( "Unsupported or unrecognized SSL message" ) ) { + LOGGER.warn( getIp() + m ); + } + // Fake an END_OF_FILE exception, as the logException() method in the server class will + // ignore there. Let's hope it will stay ignored in the future. + throw new TTransportException( TTransportException.END_OF_FILE ); + } else if ( e.getCause() instanceof SocketException + && ( e.getCause().getMessage().contains( " timed out" ) + || e.getCause().getMessage().contains( "Connection reset" ) + || e.getCause().getMessage().contains( "Connection or inbound" ) ) ) { + // Faaaake + throw new TTransportException( TTransportException.END_OF_FILE ); + } else if ( e.getMessage().contains( "larger than max length" ) || e.getMessage().contains( "Read a negative frame size" ) ) { + // Also fake, since this one prints a whole stack trace compared to the other + // message by AbstractNonblockingServer + LOGGER.debug( e.getMessage(), e ); + throw new TTransportException( TTransportException.END_OF_FILE ); + } + throw e; + } if ( size > maxLen ) - throw new TProtocolException( TProtocolException.SIZE_LIMIT, "Payload too big." ); + throw new TProtocolException( TProtocolException.SIZE_LIMIT, getIp() + "Payload too big." ); if ( size < 0 ) { int version = size & VERSION_MASK; if ( version != VERSION_1 ) { - throw new TProtocolException( TProtocolException.BAD_VERSION, "Bad version in readMessageBegin" ); + LOGGER.warn( getIp() + "Bad version (" + version + ") in readMessageBegin" ); + throw new TTransportException( TTransportException.END_OF_FILE ); } return new TMessage( readString(), (byte) ( size & 0x000000ff ), readI32() ); } else { @@ -85,24 +132,43 @@ public class TBinaryProtocolSafe extends TBinaryProtocol } } + private String getIp() + { + TTransport t = trans_; + while ( t instanceof TLayeredTransport ) { + t = ( (TLayeredTransport)t ).getInnerTransport(); + } + InetAddress ia = null; + if ( t instanceof TSocket ) { + SocketAddress sa = ( (TSocket)t ).getSocket().getRemoteSocketAddress(); + if ( sa != null && ( sa instanceof InetSocketAddress ) ) + ia = ( (InetSocketAddress)sa ).getAddress(); + if ( ia == null ) + ia = ( (TSocket)t ).getSocket().getInetAddress(); + } else { + LOGGER.debug( "getIp(" + t.getClass().getSimpleName() + ")" ); + } + if ( ia == null ) + return ""; + return ia.getHostAddress() + ": "; + } + + @Override public String readString() throws TException { int size = readI32(); if ( size > maxLen ) throw new TProtocolException( TProtocolException.SIZE_LIMIT, "Payload too big." ); if ( trans_.getBytesRemainingInBuffer() >= size ) { - try { - String s = new String( trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8" ); - trans_.consumeBuffer( size ); - return s; - } catch ( UnsupportedEncodingException e ) { - throw new TException( "JVM DOES NOT SUPPORT UTF-8" ); - } + String s = new String( trans_.getBuffer(), trans_.getBufferPosition(), size, StandardCharsets.UTF_8 ); + trans_.consumeBuffer( size ); + return s; } return readStringBody( size ); } + @Override public ByteBuffer readBinary() throws TException { int size = readI32(); @@ -120,4 +186,3 @@ public class TBinaryProtocolSafe extends TBinaryProtocol } } - diff --git a/src/main/java/org/openslx/thrifthelper/TConst.java b/src/main/java/org/openslx/thrifthelper/TConst.java index 2ff902e..b7debba 100644 --- a/src/main/java/org/openslx/thrifthelper/TConst.java +++ b/src/main/java/org/openslx/thrifthelper/TConst.java @@ -9,5 +9,6 @@ public class TConst public static final String VIRT_VMWARE = "vmware"; public static final String VIRT_VIRTUALBOX = "virtualbox"; public static final String VIRT_QEMU = "qemukvm"; + public static final String VIRT_DOCKER = "docker"; } diff --git a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java index 05563d0..37c2897 100644 --- a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java +++ b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java @@ -7,51 +7,93 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; +import org.apache.thrift.protocol.TProtocolException; import org.apache.thrift.transport.TTransportException; import org.openslx.thrifthelper.ThriftManager.ErrorCallback; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; class ThriftHandler<T extends TServiceClient> implements InvocationHandler { - private final static Logger LOGGER = Logger.getLogger( ThriftHandler.class ); + private final static Logger LOGGER = LogManager.getLogger( ThriftHandler.class ); + + /** + * How long a client/connection should be allowed to be idle before we get rid of it and open a fresh one + */ + private final static long MAX_IDLE_MS = 110_000; protected interface WantClientCallback<T> { public T getNewClient(); } - private final Deque<T> pool = new ArrayDeque<>(); + /** Pool of (potentially) good connections, waiting for reuse */ + private final Deque<TWrap<T>> pool = new ArrayDeque<>(); + /** Old connections to be released by cleanup task */ + private Deque<TWrap<T>> trash = new ArrayDeque<>(); - private final WantClientCallback<? extends T> callback; + private final WantClientCallback<? extends T> clientFactory; private final ErrorCallback errorCallback; private final Set<String> thriftMethods; - public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> cb, ErrorCallback errCb ) + public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> clientFactory, ErrorCallback errCb ) { - errorCallback = errCb; - callback = cb; + this.errorCallback = errCb; + this.clientFactory = clientFactory; Set<String> tmpset = new HashSet<String>(); Method[] methods = clazz.getMethods(); + // Iterate over all methods of this class for ( int i = 0; i < methods.length; i++ ) { boolean thrift = false; + // If a method throws some form of TException, consider it a potential method from the thrift interface Class<?>[] type = methods[i].getExceptionTypes(); for ( int e = 0; e < type.length; e++ ) { - if ( TException.class.isAssignableFrom( type[e] ) ) + if ( TException.class.isAssignableFrom( type[e] ) ) { thrift = true; + break; + } } String name = methods[i].getName(); if ( thrift && !name.startsWith( "send_" ) && !name.startsWith( "recv_" ) ) { + // Exclude the send_ and recv_ helpers tmpset.add( name ); } } thriftMethods = Collections.unmodifiableSet( tmpset ); + // Periodically scan for old connections, in case the application idles for extended periods of time... + QuickTimer.scheduleAtFixedDelay( new Task() { + @Override + public void fire() + { + Deque<TWrap<T>> newTrash = new ArrayDeque<>(); + Deque<TWrap<T>> list; + long now = System.currentTimeMillis(); + synchronized ( pool ) { + list = trash; + trash = newTrash; + for ( Iterator<TWrap<T>> it = pool.iterator(); it.hasNext(); ) { + TWrap<T> client = it.next(); + if ( client.deadline < now ) { + list.add( client ); + it.remove(); + } + } + } + for ( TWrap<T> client : list ) { + freeClient( client ); + } + } + }, MAX_IDLE_MS * 5, MAX_IDLE_MS * 5 ); } @Override @@ -63,31 +105,33 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler throw new IllegalAccessException( "Cannot call this method on a proxied thrift client" ); } - T client = getClient(); + TWrap<T> clientWrap = getClient(); try { Throwable cause = null; - for ( int i = 1;; i++ ) { - if ( client != null ) { - try { - return method.invoke( client, args ); - } catch ( InvocationTargetException e ) { - cause = e.getCause(); - if ( cause != null && ! ( cause instanceof TException ) ) { - throw cause; - } - freeClient( client ); - client = null; - if ( cause == null ) { - cause = e; - } + for ( int i = 1; clientWrap != null; i++ ) { + try { + return method.invoke( clientWrap.client, args ); + } catch ( InvocationTargetException e ) { + cause = e.getCause(); // Get original exception + if ( cause != null && ! ( cause instanceof TTransportException ) + && ! ( cause instanceof TProtocolException ) ) { + // If it's not an exception potentially hinting at dead connection, just pass it on + throw cause; + } + // Get rid of broken connection + freeClient( clientWrap ); + clientWrap = null; + if ( cause == null ) { + cause = e; } } // Call the error callback. As long as true is returned, keep retrying if ( !errorCallback.thriftError( i, method.getName(), cause ) ) { break; } - if ( client == null ) { - client = getClient(); + // Apparently we should retry, get another client + if ( clientWrap == null ) { + clientWrap = getClient(); cause = null; } } @@ -97,44 +141,69 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler throw cause; throw new TTransportException( "Could not connect" ); } finally { - returnClient( client ); + returnClient( clientWrap ); } } - private void freeClient( T client ) + private void freeClient( TWrap<T> client ) { try { - client.getInputProtocol().getTransport().close(); + client.client.getInputProtocol().getTransport().close(); } catch ( Exception e ) { } try { - client.getOutputProtocol().getTransport().close(); + client.client.getOutputProtocol().getTransport().close(); } catch ( Exception e ) { } } - private T getClient() + /** + * Get an available client connection. Prefer connection from pool, + * honoring the max idle time. If none are available, create a fresh + * client connection. + */ + private TWrap<T> getClient() { - T client; + long now = System.currentTimeMillis(); synchronized ( pool ) { - client = pool.poll(); - if ( client != null ) { - return client; + TWrap<T> client; + while ( ( client = pool.poll() ) != null ) { + if ( client.deadline >= now ) + return client; // Still fresh + trash.add( client ); } } - // Pool is closed, create new client + // No usable existing connection, create new client LOGGER.debug( "Creating new thrift client" ); - return callback.getNewClient(); + T client = clientFactory.getNewClient(); + if ( client == null ) + return null; + return new TWrap<T>( client ); } - private void returnClient( T client ) + /** + * Return a client connection to the pool, updating its last + * use timestamp for proper idle timeout handling. + */ + private void returnClient( TWrap<T> client ) { if ( client == null ) return; + client.deadline = System.currentTimeMillis() + MAX_IDLE_MS; synchronized ( pool ) { pool.push( client ); } } + + private static class TWrap<T> + { + private final T client; + private long deadline; + public TWrap(T client) + { + this.client = client; + } + } } diff --git a/src/main/java/org/openslx/thrifthelper/ThriftManager.java b/src/main/java/org/openslx/thrifthelper/ThriftManager.java index 2fe706d..07256b2 100644 --- a/src/main/java/org/openslx/thrifthelper/ThriftManager.java +++ b/src/main/java/org/openslx/thrifthelper/ThriftManager.java @@ -8,13 +8,14 @@ import java.net.Socket; import javax.net.SocketFactory; import javax.net.ssl.SSLContext; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.TServiceClient; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.layered.TFramedTransport; import org.openslx.bwlp.thrift.iface.MasterServer; import org.openslx.bwlp.thrift.iface.SatelliteServer; import org.openslx.thrifthelper.ThriftHandler.WantClientCallback; @@ -23,7 +24,7 @@ import org.openslx.util.Util; public class ThriftManager<T> { - private final static Logger LOGGER = Logger.getLogger( ThriftManager.class ); + private final static Logger LOGGER = LogManager.getLogger( ThriftManager.class ); public interface ErrorCallback { @@ -40,14 +41,13 @@ public class ThriftManager<T> private final T client; - @SuppressWarnings( "unchecked" ) private ThriftManager( Class<T> ifClazz, Class<? extends TServiceClient> clientClazz, WantClientCallback<? extends TServiceClient> internalCallback, ErrorCallback errorCb ) { - this.client = (T)Proxy.newProxyInstance( + this.client = ifClazz.cast( Proxy.newProxyInstance( ifClazz.getClassLoader(), new Class[] { ifClazz }, new ThriftHandler<TServiceClient>( - clientClazz, internalCallback, errorCb ) ); + clientClazz, internalCallback, errorCb ) ) ); } private static ThriftManager<MasterServer.Iface> masterManager = null; @@ -55,6 +55,7 @@ public class ThriftManager<T> private static ErrorCallback satErrorCallback = null; private static ErrorCallback masterErrorCallback = null; + private static SSLContext satSslContext = null; /** * Sets the address of the master server @@ -106,6 +107,7 @@ public class ThriftManager<T> LOGGER.error( "Given address is empty." ); return false; } + satSslContext = ctx; // finally set it satelliteManager = new ThriftManager<SatelliteServer.Iface>( SatelliteServer.Iface.class, SatelliteServer.Client.class, new WantClientCallback<SatelliteServer.Client>() { @@ -123,6 +125,14 @@ public class ThriftManager<T> } ); return true; } + + /** + * Get the SSL context used for talking to the satellite server (if any) + */ + public static SSLContext getSatelliteSslContext() + { + return satSslContext; + } /** * Returns the singleton client of the thrift connection to the satellite |