summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/thrifthelper
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/thrifthelper')
-rw-r--r--src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java6
-rw-r--r--src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java99
-rw-r--r--src/main/java/org/openslx/thrifthelper/TConst.java1
-rw-r--r--src/main/java/org/openslx/thrifthelper/ThriftHandler.java143
-rw-r--r--src/main/java/org/openslx/thrifthelper/ThriftManager.java22
5 files changed, 210 insertions, 61 deletions
diff --git a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java
index 5cf0a10..5c5ef01 100644
--- a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java
+++ b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java
@@ -2,9 +2,13 @@ package org.openslx.thrifthelper;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
-@SuppressWarnings( "serial" )
public class ImagePublishDataEx extends ImagePublishData
{
+ /**
+ * Version for serialization.
+ */
+ private static final long serialVersionUID = 314945044011262005L;
+
public String exImagePath;
public boolean exIsValid;
}
diff --git a/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java b/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java
index 86a2306..ca2bb2c 100644
--- a/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java
+++ b/src/main/java/org/openslx/thrifthelper/TBinaryProtocolSafe.java
@@ -1,15 +1,26 @@
package org.openslx.thrifthelper;
-import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import javax.net.ssl.SSLException;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TLayeredTransport;
/**
* Binary protocol implementation for thrift.
@@ -18,13 +29,20 @@ import org.apache.thrift.transport.TTransport;
*/
public class TBinaryProtocolSafe extends TBinaryProtocol
{
+
+ private final static Logger LOGGER = LogManager.getLogger( ThriftHandler.class );
+
/**
* Factory
*/
- @SuppressWarnings( "serial" )
public static class Factory implements TProtocolFactory
{
+ /**
+ * Version for serialization.
+ */
+ private static final long serialVersionUID = 6896537370338823740L;
+
protected boolean strictRead_ = false;
protected boolean strictWrite_ = true;
@@ -57,24 +75,53 @@ public class TBinaryProtocolSafe extends TBinaryProtocol
public TBinaryProtocolSafe(TTransport trans, boolean strictRead, boolean strictWrite)
{
- super( trans );
- strictRead_ = strictRead;
- strictWrite_ = strictWrite;
+ super( trans, maxLen, maxLen, strictRead, strictWrite );
}
- /**
+ /*
* Reading methods.
*/
+ @Override
public TMessage readMessageBegin() throws TException
{
- int size = readI32();
+ int size;
+ try {
+ size = readI32();
+ } catch ( TTransportException e ) {
+ // Do this to suppress certain SSL handshake errors that result from port scanning and service probing
+ if ( e.getCause() instanceof SSLException ) {
+ String m = e.getCause().getMessage();
+ // We still want SSL errors that help diagnosing more specific SSL errors that relate to actual
+ // SSL handshake attempts, like incompatible TLS versions or ciphers.
+ if ( !m.contains( "Remote host terminated the handshake" )
+ && !m.contains( "Unsupported or unrecognized SSL message" ) ) {
+ LOGGER.warn( getIp() + m );
+ }
+ // Fake an END_OF_FILE exception, as the logException() method in the server class will
+ // ignore there. Let's hope it will stay ignored in the future.
+ throw new TTransportException( TTransportException.END_OF_FILE );
+ } else if ( e.getCause() instanceof SocketException
+ && ( e.getCause().getMessage().contains( " timed out" )
+ || e.getCause().getMessage().contains( "Connection reset" )
+ || e.getCause().getMessage().contains( "Connection or inbound" ) ) ) {
+ // Faaaake
+ throw new TTransportException( TTransportException.END_OF_FILE );
+ } else if ( e.getMessage().contains( "larger than max length" ) || e.getMessage().contains( "Read a negative frame size" ) ) {
+ // Also fake, since this one prints a whole stack trace compared to the other
+ // message by AbstractNonblockingServer
+ LOGGER.debug( e.getMessage(), e );
+ throw new TTransportException( TTransportException.END_OF_FILE );
+ }
+ throw e;
+ }
if ( size > maxLen )
- throw new TProtocolException( TProtocolException.SIZE_LIMIT, "Payload too big." );
+ throw new TProtocolException( TProtocolException.SIZE_LIMIT, getIp() + "Payload too big." );
if ( size < 0 ) {
int version = size & VERSION_MASK;
if ( version != VERSION_1 ) {
- throw new TProtocolException( TProtocolException.BAD_VERSION, "Bad version in readMessageBegin" );
+ LOGGER.warn( getIp() + "Bad version (" + version + ") in readMessageBegin" );
+ throw new TTransportException( TTransportException.END_OF_FILE );
}
return new TMessage( readString(), (byte) ( size & 0x000000ff ), readI32() );
} else {
@@ -85,24 +132,43 @@ public class TBinaryProtocolSafe extends TBinaryProtocol
}
}
+ private String getIp()
+ {
+ TTransport t = trans_;
+ while ( t instanceof TLayeredTransport ) {
+ t = ( (TLayeredTransport)t ).getInnerTransport();
+ }
+ InetAddress ia = null;
+ if ( t instanceof TSocket ) {
+ SocketAddress sa = ( (TSocket)t ).getSocket().getRemoteSocketAddress();
+ if ( sa != null && ( sa instanceof InetSocketAddress ) )
+ ia = ( (InetSocketAddress)sa ).getAddress();
+ if ( ia == null )
+ ia = ( (TSocket)t ).getSocket().getInetAddress();
+ } else {
+ LOGGER.debug( "getIp(" + t.getClass().getSimpleName() + ")" );
+ }
+ if ( ia == null )
+ return "";
+ return ia.getHostAddress() + ": ";
+ }
+
+ @Override
public String readString() throws TException
{
int size = readI32();
if ( size > maxLen )
throw new TProtocolException( TProtocolException.SIZE_LIMIT, "Payload too big." );
if ( trans_.getBytesRemainingInBuffer() >= size ) {
- try {
- String s = new String( trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8" );
- trans_.consumeBuffer( size );
- return s;
- } catch ( UnsupportedEncodingException e ) {
- throw new TException( "JVM DOES NOT SUPPORT UTF-8" );
- }
+ String s = new String( trans_.getBuffer(), trans_.getBufferPosition(), size, StandardCharsets.UTF_8 );
+ trans_.consumeBuffer( size );
+ return s;
}
return readStringBody( size );
}
+ @Override
public ByteBuffer readBinary() throws TException
{
int size = readI32();
@@ -120,4 +186,3 @@ public class TBinaryProtocolSafe extends TBinaryProtocol
}
}
-
diff --git a/src/main/java/org/openslx/thrifthelper/TConst.java b/src/main/java/org/openslx/thrifthelper/TConst.java
index 2ff902e..b7debba 100644
--- a/src/main/java/org/openslx/thrifthelper/TConst.java
+++ b/src/main/java/org/openslx/thrifthelper/TConst.java
@@ -9,5 +9,6 @@ public class TConst
public static final String VIRT_VMWARE = "vmware";
public static final String VIRT_VIRTUALBOX = "virtualbox";
public static final String VIRT_QEMU = "qemukvm";
+ public static final String VIRT_DOCKER = "docker";
}
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
index 05563d0..37c2897 100644
--- a/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
+++ b/src/main/java/org/openslx/thrifthelper/ThriftHandler.java
@@ -7,51 +7,93 @@ import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
+import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.transport.TTransportException;
import org.openslx.thrifthelper.ThriftManager.ErrorCallback;
+import org.openslx.util.QuickTimer;
+import org.openslx.util.QuickTimer.Task;
class ThriftHandler<T extends TServiceClient> implements InvocationHandler
{
- private final static Logger LOGGER = Logger.getLogger( ThriftHandler.class );
+ private final static Logger LOGGER = LogManager.getLogger( ThriftHandler.class );
+
+ /**
+ * How long a client/connection should be allowed to be idle before we get rid of it and open a fresh one
+ */
+ private final static long MAX_IDLE_MS = 110_000;
protected interface WantClientCallback<T>
{
public T getNewClient();
}
- private final Deque<T> pool = new ArrayDeque<>();
+ /** Pool of (potentially) good connections, waiting for reuse */
+ private final Deque<TWrap<T>> pool = new ArrayDeque<>();
+ /** Old connections to be released by cleanup task */
+ private Deque<TWrap<T>> trash = new ArrayDeque<>();
- private final WantClientCallback<? extends T> callback;
+ private final WantClientCallback<? extends T> clientFactory;
private final ErrorCallback errorCallback;
private final Set<String> thriftMethods;
- public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> cb, ErrorCallback errCb )
+ public ThriftHandler( final Class<? extends T> clazz, WantClientCallback<? extends T> clientFactory, ErrorCallback errCb )
{
- errorCallback = errCb;
- callback = cb;
+ this.errorCallback = errCb;
+ this.clientFactory = clientFactory;
Set<String> tmpset = new HashSet<String>();
Method[] methods = clazz.getMethods();
+ // Iterate over all methods of this class
for ( int i = 0; i < methods.length; i++ ) {
boolean thrift = false;
+ // If a method throws some form of TException, consider it a potential method from the thrift interface
Class<?>[] type = methods[i].getExceptionTypes();
for ( int e = 0; e < type.length; e++ ) {
- if ( TException.class.isAssignableFrom( type[e] ) )
+ if ( TException.class.isAssignableFrom( type[e] ) ) {
thrift = true;
+ break;
+ }
}
String name = methods[i].getName();
if ( thrift && !name.startsWith( "send_" ) && !name.startsWith( "recv_" ) ) {
+ // Exclude the send_ and recv_ helpers
tmpset.add( name );
}
}
thriftMethods = Collections.unmodifiableSet( tmpset );
+ // Periodically scan for old connections, in case the application idles for extended periods of time...
+ QuickTimer.scheduleAtFixedDelay( new Task() {
+ @Override
+ public void fire()
+ {
+ Deque<TWrap<T>> newTrash = new ArrayDeque<>();
+ Deque<TWrap<T>> list;
+ long now = System.currentTimeMillis();
+ synchronized ( pool ) {
+ list = trash;
+ trash = newTrash;
+ for ( Iterator<TWrap<T>> it = pool.iterator(); it.hasNext(); ) {
+ TWrap<T> client = it.next();
+ if ( client.deadline < now ) {
+ list.add( client );
+ it.remove();
+ }
+ }
+ }
+ for ( TWrap<T> client : list ) {
+ freeClient( client );
+ }
+ }
+ }, MAX_IDLE_MS * 5, MAX_IDLE_MS * 5 );
}
@Override
@@ -63,31 +105,33 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler
throw new IllegalAccessException( "Cannot call this method on a proxied thrift client" );
}
- T client = getClient();
+ TWrap<T> clientWrap = getClient();
try {
Throwable cause = null;
- for ( int i = 1;; i++ ) {
- if ( client != null ) {
- try {
- return method.invoke( client, args );
- } catch ( InvocationTargetException e ) {
- cause = e.getCause();
- if ( cause != null && ! ( cause instanceof TException ) ) {
- throw cause;
- }
- freeClient( client );
- client = null;
- if ( cause == null ) {
- cause = e;
- }
+ for ( int i = 1; clientWrap != null; i++ ) {
+ try {
+ return method.invoke( clientWrap.client, args );
+ } catch ( InvocationTargetException e ) {
+ cause = e.getCause(); // Get original exception
+ if ( cause != null && ! ( cause instanceof TTransportException )
+ && ! ( cause instanceof TProtocolException ) ) {
+ // If it's not an exception potentially hinting at dead connection, just pass it on
+ throw cause;
+ }
+ // Get rid of broken connection
+ freeClient( clientWrap );
+ clientWrap = null;
+ if ( cause == null ) {
+ cause = e;
}
}
// Call the error callback. As long as true is returned, keep retrying
if ( !errorCallback.thriftError( i, method.getName(), cause ) ) {
break;
}
- if ( client == null ) {
- client = getClient();
+ // Apparently we should retry, get another client
+ if ( clientWrap == null ) {
+ clientWrap = getClient();
cause = null;
}
}
@@ -97,44 +141,69 @@ class ThriftHandler<T extends TServiceClient> implements InvocationHandler
throw cause;
throw new TTransportException( "Could not connect" );
} finally {
- returnClient( client );
+ returnClient( clientWrap );
}
}
- private void freeClient( T client )
+ private void freeClient( TWrap<T> client )
{
try {
- client.getInputProtocol().getTransport().close();
+ client.client.getInputProtocol().getTransport().close();
} catch ( Exception e ) {
}
try {
- client.getOutputProtocol().getTransport().close();
+ client.client.getOutputProtocol().getTransport().close();
} catch ( Exception e ) {
}
}
- private T getClient()
+ /**
+ * Get an available client connection. Prefer connection from pool,
+ * honoring the max idle time. If none are available, create a fresh
+ * client connection.
+ */
+ private TWrap<T> getClient()
{
- T client;
+ long now = System.currentTimeMillis();
synchronized ( pool ) {
- client = pool.poll();
- if ( client != null ) {
- return client;
+ TWrap<T> client;
+ while ( ( client = pool.poll() ) != null ) {
+ if ( client.deadline >= now )
+ return client; // Still fresh
+ trash.add( client );
}
}
- // Pool is closed, create new client
+ // No usable existing connection, create new client
LOGGER.debug( "Creating new thrift client" );
- return callback.getNewClient();
+ T client = clientFactory.getNewClient();
+ if ( client == null )
+ return null;
+ return new TWrap<T>( client );
}
- private void returnClient( T client )
+ /**
+ * Return a client connection to the pool, updating its last
+ * use timestamp for proper idle timeout handling.
+ */
+ private void returnClient( TWrap<T> client )
{
if ( client == null )
return;
+ client.deadline = System.currentTimeMillis() + MAX_IDLE_MS;
synchronized ( pool ) {
pool.push( client );
}
}
+
+ private static class TWrap<T>
+ {
+ private final T client;
+ private long deadline;
+ public TWrap(T client)
+ {
+ this.client = client;
+ }
+ }
}
diff --git a/src/main/java/org/openslx/thrifthelper/ThriftManager.java b/src/main/java/org/openslx/thrifthelper/ThriftManager.java
index 2fe706d..07256b2 100644
--- a/src/main/java/org/openslx/thrifthelper/ThriftManager.java
+++ b/src/main/java/org/openslx/thrifthelper/ThriftManager.java
@@ -8,13 +8,14 @@ import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
import org.openslx.bwlp.thrift.iface.MasterServer;
import org.openslx.bwlp.thrift.iface.SatelliteServer;
import org.openslx.thrifthelper.ThriftHandler.WantClientCallback;
@@ -23,7 +24,7 @@ import org.openslx.util.Util;
public class ThriftManager<T>
{
- private final static Logger LOGGER = Logger.getLogger( ThriftManager.class );
+ private final static Logger LOGGER = LogManager.getLogger( ThriftManager.class );
public interface ErrorCallback
{
@@ -40,14 +41,13 @@ public class ThriftManager<T>
private final T client;
- @SuppressWarnings( "unchecked" )
private ThriftManager( Class<T> ifClazz, Class<? extends TServiceClient> clientClazz,
WantClientCallback<? extends TServiceClient> internalCallback, ErrorCallback errorCb )
{
- this.client = (T)Proxy.newProxyInstance(
+ this.client = ifClazz.cast( Proxy.newProxyInstance(
ifClazz.getClassLoader(),
new Class[] { ifClazz }, new ThriftHandler<TServiceClient>(
- clientClazz, internalCallback, errorCb ) );
+ clientClazz, internalCallback, errorCb ) ) );
}
private static ThriftManager<MasterServer.Iface> masterManager = null;
@@ -55,6 +55,7 @@ public class ThriftManager<T>
private static ErrorCallback satErrorCallback = null;
private static ErrorCallback masterErrorCallback = null;
+ private static SSLContext satSslContext = null;
/**
* Sets the address of the master server
@@ -106,6 +107,7 @@ public class ThriftManager<T>
LOGGER.error( "Given address is empty." );
return false;
}
+ satSslContext = ctx;
// finally set it
satelliteManager = new ThriftManager<SatelliteServer.Iface>( SatelliteServer.Iface.class, SatelliteServer.Client.class,
new WantClientCallback<SatelliteServer.Client>() {
@@ -123,6 +125,14 @@ public class ThriftManager<T>
} );
return true;
}
+
+ /**
+ * Get the SSL context used for talking to the satellite server (if any)
+ */
+ public static SSLContext getSatelliteSslContext()
+ {
+ return satSslContext;
+ }
/**
* Returns the singleton client of the thrift connection to the satellite