From 49a993786c6435cc17780241dd205a2ca1a818a2 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 7 Jan 2020 15:34:57 +0100 Subject: Add TCP interface Supports password protection --- .../src/main/java/org/openslx/taskmanager/App.java | 26 ++- .../main/java/org/openslx/taskmanager/Global.java | 57 +++++- .../taskmanager/network/NetworkHandler.java | 171 ------------------ .../taskmanager/network/NetworkHandlerBase.java | 12 ++ .../taskmanager/network/NetworkHandlerTcp.java | 193 +++++++++++++++++++++ .../taskmanager/network/NetworkHandlerUdp.java | 170 ++++++++++++++++++ .../java/org/openslx/taskmanager/util/Util.java | 9 + 7 files changed, 459 insertions(+), 179 deletions(-) delete mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java index 6b281d6..d64929d 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/App.java +++ b/daemon/src/main/java/org/openslx/taskmanager/App.java @@ -1,12 +1,14 @@ package org.openslx.taskmanager; -import java.net.SocketException; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; import org.openslx.taskmanager.main.Taskmanager; -import org.openslx.taskmanager.network.NetworkHandler; +import org.openslx.taskmanager.network.NetworkHandlerTcp; +import org.openslx.taskmanager.network.NetworkHandlerUdp; import org.openslx.taskmanager.network.RequestParser; /** @@ -15,18 +17,32 @@ import org.openslx.taskmanager.network.RequestParser; */ public class App { + + private static final Logger log = Logger.getLogger( App.class ); - public static void main( String[] args ) throws SocketException, InterruptedException + public static void main( String[] args ) throws InterruptedException, IOException { BasicConfigurator.configure(); + if (Global.PORT_UDP == -1 && Global.PORT_TCP == -1) { + log.fatal( "Neither UDP nor TCP configured" ); + System.exit( 1 ); + } // Load all task plugins Environment.load( "config/environment" ); List threads = new ArrayList<>(); Taskmanager tm = new Taskmanager(); RequestParser parser = new RequestParser( tm ); - NetworkHandler nh = new NetworkHandler( Global.LISTEN_PORT, Global.LISTEN_ADDRESS, parser ); + NetworkHandlerUdp udp = null; + NetworkHandlerTcp tcp = null; + if (Global.PORT_UDP != -1) { + udp = new NetworkHandlerUdp( Global.PORT_UDP, Global.LISTEN_ADDRESS, parser ); + threads.add( new Thread( udp ) ); + } + if (Global.PORT_TCP != -1) { + tcp = new NetworkHandlerTcp( Global.PORT_TCP, Global.LISTEN_ADDRESS, parser ); + threads.add( new Thread( tcp ) ); + } threads.add( new Thread( tm ) ); - threads.add( new Thread( nh ) ); // Wait for everything for (Thread t : threads) { t.start(); diff --git a/daemon/src/main/java/org/openslx/taskmanager/Global.java b/daemon/src/main/java/org/openslx/taskmanager/Global.java index 7ca2c2d..5be8196 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/Global.java +++ b/daemon/src/main/java/org/openslx/taskmanager/Global.java @@ -1,13 +1,23 @@ package org.openslx.taskmanager; +import java.io.FileInputStream; +import java.io.IOException; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermission; +import java.util.Properties; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.util.Util; public class Global { - public static final int LISTEN_PORT = 9215; + private static final Logger log = Logger.getLogger( Global.class ); public static final String TASK_PACKAGE_NAME = "org.openslx.taskmanager.tasks"; @@ -15,10 +25,17 @@ public class Global public static final InetAddress LISTEN_ADDRESS; + public static final int MAX_REQUEST_SIZE = 1 * 1024 * 1024; // 1 Meg + + public static final String PASSWORD; + + public static final int PORT_UDP; + + public static final int PORT_TCP; + public static volatile boolean doShutdown = false; - static - { + static { InetAddress la; try { la = Inet4Address.getByName( "127.0.0.1" ); @@ -27,6 +44,40 @@ public class Global e.printStackTrace(); } LISTEN_ADDRESS = la; + + String pw = ""; + int udp = 9215, tcp = -1; + Path configPath = Paths.get( "config/config" ); + if ( Files.exists( configPath ) ) { + log.info( "Loading config from " + configPath.toAbsolutePath().toString() ); + Properties p = new Properties(); + try { + p.load( new FileInputStream( configPath.toFile() ) ); + pw = p.getProperty( "password" ); + } catch ( Exception e ) { + log.warn( "Cannot read from config file", e ); + } + udp = Util.parseInt( p.getProperty( "udp", "-1" ), -1 ); + tcp = Util.parseInt( p.getProperty( "tcp", "-1" ), -1 ); + if ( !pw.isEmpty() ) { + try { + if ( Files.getPosixFilePermissions( configPath ).contains( PosixFilePermission.OTHERS_READ ) ) { + log.warn( "******** Config file is world readable" ); + } + } catch ( IOException e1 ) { + e1.printStackTrace(); + } + } + } + if ( udp != -1 ) { + log.warn( "******** Running with passwordless legacy UDP interface" ); + } + if ( tcp != -1 && pw.isEmpty() ) { + log.warn( "******** Running with no password" ); + } + PASSWORD = pw; + PORT_UDP = udp; + PORT_TCP = tcp; } } diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java deleted file mode 100644 index 6946cd1..0000000 --- a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java +++ /dev/null @@ -1,171 +0,0 @@ -package org.openslx.taskmanager.network; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.log4j.Logger; -import org.openslx.taskmanager.Global; - -/** - * The network listener that will receive incoming UDP packets, try to process - * them, and then send a reply. - */ -public class NetworkHandler implements Runnable -{ - - private static final Logger log = Logger.getLogger( NetworkHandler.class ); - - // Static part - - private Thread sendThread = null; - /** - * Sender instance (Runnable handling outgoing packets) - */ - private final Sender sender; - /** - * UDP socket for sending and receiving. - */ - private final DatagramSocket socket; - - private final RequestParser parser; - - /** - * Initialize the NetworkHandler by starting threads and opening the socket. - */ - public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException - { - socket = new DatagramSocket( port, listenAddress ); - sendThread = new Thread( sender = new Sender() ); - this.parser = parser; - } - - public void shutdown() - { - socket.close(); - } - - // Class part - - /** - * Prepare and enqueue reply for client request. - * Only ever to be called from the receiving thread. The reply message is crafted - * and then handed over to the sending thread. - * - * @param destination SocketAddress of the client - * @param messageId The same ID the client used in it's request. - * It's echoed back to the client to enable request bursts, and has no meaning for the - * server. - * @param status A TaskStatus instance to be serialized to json and sent to the client. - */ - private void send( SocketAddress destination, byte[] buffer ) - { - final DatagramPacket packet; - try { - packet = new DatagramPacket( buffer, buffer.length, destination ); - } catch ( Exception e ) { - log.warn( "Could not construct datagram packet for target " + destination.toString() ); - e.printStackTrace(); - return; - } - sender.send( packet ); - } - - /** - * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode - */ - @Override - public void run() - { - byte readBuffer[] = new byte[ 66000 ]; - try { - sendThread.start(); - while ( !Global.doShutdown ) { - DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); - try { - socket.receive( packet ); - } catch ( IOException e ) { - log.info( "IOException on UDP socket when reading: " + e.getMessage() ); - Thread.sleep( 100 ); - continue; - } - if ( packet.getLength() < 2 ) { - log.debug( "Message too short" ); - continue; - } - String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); - try { - byte[] reply = parser.handle( payload ); - if ( reply != null ) - send( packet.getSocketAddress(), reply ); - } catch ( Throwable t ) { - log.error( "Exception in RequestParser: " + t.toString() ); - log.error( "Payload was: " + payload ); - t.printStackTrace(); - } - } - } catch ( InterruptedException e ) { - Thread.currentThread().interrupt(); - } finally { - Global.doShutdown = true; - sendThread.interrupt(); - log.info( "UDP receiver finished." ); - } - } - - /** - * Private sending thread. - * Use blocking queue, wait for packet to be added to it, then try to send. - */ - private class Sender implements Runnable - { - - /** - * Queue to stuff outgoing packets into. - */ - private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 ); - - /** - * Wait until something is put into the queue, then send it. - */ - @Override - public void run() - { - try { - while ( !Global.doShutdown ) { - final DatagramPacket packet; - packet = queue.take(); - try { - socket.send( packet ); - } catch ( IOException e ) { - log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString(), e ); - } - } - } catch ( InterruptedException e ) { - Thread.currentThread().interrupt(); - } finally { - Global.doShutdown = true; - log.info( "UDP sender finished." ); - } - } - - /** - * Add something to the outgoing packet queue. - * Called from the receiving thread. - */ - public void send( DatagramPacket packet ) - { - if ( queue.offer( packet ) ) - return; - log.warn( "Could not add packet to queue: Full" ); - } - - } - -} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java new file mode 100644 index 0000000..03dc32f --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java @@ -0,0 +1,12 @@ +package org.openslx.taskmanager.network; + +public abstract class NetworkHandlerBase implements Runnable +{ + protected final RequestParser parser; + + public NetworkHandlerBase( RequestParser parser ) + { + this.parser = parser; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java new file mode 100644 index 0000000..e37b3d7 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java @@ -0,0 +1,193 @@ +package org.openslx.taskmanager.network; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandlerTcp extends NetworkHandlerBase +{ + + private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class ); + + /** + * UDP socket for sending and receiving. + */ + private final ServerSocket socket; + + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue() ); + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException + { + super( parser ); + socket = new ServerSocket( port, 10, listenAddress ); + log.info( "Listening on TCP:" + port ); + threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() ); + } + + public void shutdown() + { + try { + socket.close(); + } catch ( IOException e ) { + } + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + Socket client; + try { + client = socket.accept(); + } catch ( IOException e1 ) { + log.warn( "ACCEPT fail", e1 ); + break; + } + try { + threadPool.execute( new ClientTask( client ) ); + } catch ( RejectedExecutionException e ) { + try { + client.close(); + } catch ( IOException e1 ) { + } + } + } + } finally { + Thread.currentThread().interrupt(); + Global.doShutdown = true; + log.info( "UDP receiver finished." ); + } + } + + private class ClientTask implements Runnable + { + private final Socket socket; + private DataOutputStream writer = null; + private DataInputStream reader = null; + + public ClientTask( Socket client ) + { + this.socket = client; + } + + @Override + public void run() + { + try { + try { + socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) ); + reader = new DataInputStream( socket.getInputStream() ); + writer = new DataOutputStream( socket.getOutputStream() ); + } catch ( IOException e ) { + log.info( "IOException on TCP socket when setting up streams", e ); + return; + } + String payload = readMsg( reader ); + if ( payload == null ) + return; + int i = payload.indexOf( ' ' ); + if ( i != -1 ) { + // For future extensibility we throw away everything after the first space (including the space) + payload = payload.substring( 0, i ); + } + if ( !payload.equals( Global.PASSWORD ) ) { + sendMsg( "ERROR,Wrong password" ); + return; + } + while ( !socket.isClosed() ) { + payload = readMsg( reader ); + if ( payload == null ) + return; + byte[] reply; + try { + reply = parser.handle( payload ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser", t ); + log.error( "Payload was: " + payload ); + continue; + } + if ( reply != null ) { + sendMsg( reply ); + } + } + } catch ( SendException e ) { + log.warn( "Cannot send reply to client", e ); + } finally { + try { + if ( writer != null ) { + writer.flush(); + writer.close(); + } + socket.close(); + } catch ( IOException e ) { + } + } + } + + private void sendMsg( String reply ) throws SendException + { + sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) ); + } + + private void sendMsg( byte[] reply ) throws SendException + { + try { + writer.writeInt( reply.length ); + writer.write( reply ); + } catch ( IOException e ) { + throw new SendException(); + } + } + + private String readMsg( DataInputStream reader ) + { + int bytes; + try { + bytes = reader.readInt(); + } catch ( IOException e ) { + // This should be fine... Client went away + return null; + } + if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) { + log.info( "Invalid request size: " + bytes ); + return null; + } + if ( bytes == 0 ) + return ""; // Nothing to read + byte[] buffer = new byte[ bytes ]; + try { + reader.readFully( buffer ); + } catch ( IOException e ) { + log.warn( "Client went away when trying to read payload" ); + return null; + } + return new String( buffer, StandardCharsets.UTF_8 ); + } + + } + + private static class SendException extends Exception {} + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java new file mode 100644 index 0000000..4c12960 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java @@ -0,0 +1,170 @@ +package org.openslx.taskmanager.network; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandlerUdp extends NetworkHandlerBase +{ + + private static final Logger log = Logger.getLogger( NetworkHandlerUdp.class ); + + private Thread sendThread = null; + /** + * Sender instance (Runnable handling outgoing packets) + */ + private final Sender sender; + /** + * UDP socket for sending and receiving. + */ + private final DatagramSocket socket; + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public NetworkHandlerUdp( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException + { + super(parser); + socket = new DatagramSocket( port, listenAddress ); + log.info( "Listening on UDP:" + port ); + sendThread = new Thread( sender = new Sender() ); + } + + public void shutdown() + { + socket.close(); + } + + // Class part + + /** + * Prepare and enqueue reply for client request. + * Only ever to be called from the receiving thread. The reply message is crafted + * and then handed over to the sending thread. + * + * @param destination SocketAddress of the client + * @param messageId The same ID the client used in it's request. + * It's echoed back to the client to enable request bursts, and has no meaning for the + * server. + * @param status A TaskStatus instance to be serialized to json and sent to the client. + */ + private void send( SocketAddress destination, byte[] buffer ) + { + final DatagramPacket packet; + try { + packet = new DatagramPacket( buffer, buffer.length, destination ); + } catch ( Exception e ) { + log.warn( "Could not construct datagram packet for target " + destination.toString() ); + e.printStackTrace(); + return; + } + sender.send( packet ); + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + byte readBuffer[] = new byte[ 66000 ]; + try { + sendThread.start(); + while ( !Global.doShutdown ) { + DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); + try { + socket.receive( packet ); + } catch ( IOException e ) { + log.info( "IOException on UDP socket when reading: " + e.getMessage() ); + Thread.sleep( 100 ); + continue; + } + if ( packet.getLength() < 2 ) { + log.debug( "Message too short" ); + continue; + } + String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); + byte[] reply; + try { + reply = parser.handle( payload ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser: " + t.toString() ); + log.error( "Payload was: " + payload ); + t.printStackTrace(); + continue; + } + if ( reply != null ) + send( packet.getSocketAddress(), reply ); + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + sendThread.interrupt(); + log.info( "UDP receiver finished." ); + } + } + + /** + * Private sending thread. + * Use blocking queue, wait for packet to be added to it, then try to send. + */ + private class Sender implements Runnable + { + + /** + * Queue to stuff outgoing packets into. + */ + private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 ); + + /** + * Wait until something is put into the queue, then send it. + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + final DatagramPacket packet; + packet = queue.take(); + try { + socket.send( packet ); + } catch ( IOException e ) { + log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString(), e ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP sender finished." ); + } + } + + /** + * Add something to the outgoing packet queue. + * Called from the receiving thread. + */ + public void send( DatagramPacket packet ) + { + if ( queue.offer( packet ) ) + return; + log.warn( "Could not add packet to queue: Full" ); + } + + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/Util.java b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java index bf52ecb..361c0e5 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/util/Util.java +++ b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java @@ -20,5 +20,14 @@ public class Util { return gsonBuilder.excludeFieldsWithoutExposeAnnotation().create(); } + + public static int parseInt(String str, int def) + { + try { + return Integer.parseInt( str ); + } catch (Throwable t) { + return def; + } + } } -- cgit v1.2.3-55-g7522