summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2020-01-07 15:34:57 +0100
committerSimon Rettberg2020-01-07 15:34:57 +0100
commit49a993786c6435cc17780241dd205a2ca1a818a2 (patch)
tree00174300d4d1c933198b96c7c3dbc0c1fe55603f
parentChange thread pool params; allow more concurrent tasks (diff)
downloadtaskman-lite-49a993786c6435cc17780241dd205a2ca1a818a2.tar.gz
taskman-lite-49a993786c6435cc17780241dd205a2ca1a818a2.tar.xz
taskman-lite-49a993786c6435cc17780241dd205a2ca1a818a2.zip
Add TCP interface
Supports password protection
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/App.java26
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/Global.java57
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java12
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java193
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java (renamed from daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java)21
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/util/Util.java9
6 files changed, 299 insertions, 19 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java
index 6b281d6..d64929d 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/App.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/App.java
@@ -1,12 +1,14 @@
package org.openslx.taskmanager;
-import java.net.SocketException;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
import org.openslx.taskmanager.main.Taskmanager;
-import org.openslx.taskmanager.network.NetworkHandler;
+import org.openslx.taskmanager.network.NetworkHandlerTcp;
+import org.openslx.taskmanager.network.NetworkHandlerUdp;
import org.openslx.taskmanager.network.RequestParser;
/**
@@ -15,18 +17,32 @@ import org.openslx.taskmanager.network.RequestParser;
*/
public class App
{
+
+ private static final Logger log = Logger.getLogger( App.class );
- public static void main( String[] args ) throws SocketException, InterruptedException
+ public static void main( String[] args ) throws InterruptedException, IOException
{
BasicConfigurator.configure();
+ if (Global.PORT_UDP == -1 && Global.PORT_TCP == -1) {
+ log.fatal( "Neither UDP nor TCP configured" );
+ System.exit( 1 );
+ }
// Load all task plugins
Environment.load( "config/environment" );
List<Thread> threads = new ArrayList<>();
Taskmanager tm = new Taskmanager();
RequestParser parser = new RequestParser( tm );
- NetworkHandler nh = new NetworkHandler( Global.LISTEN_PORT, Global.LISTEN_ADDRESS, parser );
+ NetworkHandlerUdp udp = null;
+ NetworkHandlerTcp tcp = null;
+ if (Global.PORT_UDP != -1) {
+ udp = new NetworkHandlerUdp( Global.PORT_UDP, Global.LISTEN_ADDRESS, parser );
+ threads.add( new Thread( udp ) );
+ }
+ if (Global.PORT_TCP != -1) {
+ tcp = new NetworkHandlerTcp( Global.PORT_TCP, Global.LISTEN_ADDRESS, parser );
+ threads.add( new Thread( tcp ) );
+ }
threads.add( new Thread( tm ) );
- threads.add( new Thread( nh ) );
// Wait for everything
for (Thread t : threads) {
t.start();
diff --git a/daemon/src/main/java/org/openslx/taskmanager/Global.java b/daemon/src/main/java/org/openslx/taskmanager/Global.java
index 7ca2c2d..5be8196 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/Global.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/Global.java
@@ -1,13 +1,23 @@
package org.openslx.taskmanager;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.util.Util;
public class Global
{
- public static final int LISTEN_PORT = 9215;
+ private static final Logger log = Logger.getLogger( Global.class );
public static final String TASK_PACKAGE_NAME = "org.openslx.taskmanager.tasks";
@@ -15,10 +25,17 @@ public class Global
public static final InetAddress LISTEN_ADDRESS;
+ public static final int MAX_REQUEST_SIZE = 1 * 1024 * 1024; // 1 Meg
+
+ public static final String PASSWORD;
+
+ public static final int PORT_UDP;
+
+ public static final int PORT_TCP;
+
public static volatile boolean doShutdown = false;
- static
- {
+ static {
InetAddress la;
try {
la = Inet4Address.getByName( "127.0.0.1" );
@@ -27,6 +44,40 @@ public class Global
e.printStackTrace();
}
LISTEN_ADDRESS = la;
+
+ String pw = "";
+ int udp = 9215, tcp = -1;
+ Path configPath = Paths.get( "config/config" );
+ if ( Files.exists( configPath ) ) {
+ log.info( "Loading config from " + configPath.toAbsolutePath().toString() );
+ Properties p = new Properties();
+ try {
+ p.load( new FileInputStream( configPath.toFile() ) );
+ pw = p.getProperty( "password" );
+ } catch ( Exception e ) {
+ log.warn( "Cannot read from config file", e );
+ }
+ udp = Util.parseInt( p.getProperty( "udp", "-1" ), -1 );
+ tcp = Util.parseInt( p.getProperty( "tcp", "-1" ), -1 );
+ if ( !pw.isEmpty() ) {
+ try {
+ if ( Files.getPosixFilePermissions( configPath ).contains( PosixFilePermission.OTHERS_READ ) ) {
+ log.warn( "******** Config file is world readable" );
+ }
+ } catch ( IOException e1 ) {
+ e1.printStackTrace();
+ }
+ }
+ }
+ if ( udp != -1 ) {
+ log.warn( "******** Running with passwordless legacy UDP interface" );
+ }
+ if ( tcp != -1 && pw.isEmpty() ) {
+ log.warn( "******** Running with no password" );
+ }
+ PASSWORD = pw;
+ PORT_UDP = udp;
+ PORT_TCP = tcp;
}
}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java
new file mode 100644
index 0000000..03dc32f
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java
@@ -0,0 +1,12 @@
+package org.openslx.taskmanager.network;
+
+public abstract class NetworkHandlerBase implements Runnable
+{
+ protected final RequestParser parser;
+
+ public NetworkHandlerBase( RequestParser parser )
+ {
+ this.parser = parser;
+ }
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
new file mode 100644
index 0000000..e37b3d7
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
@@ -0,0 +1,193 @@
+package org.openslx.taskmanager.network;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.Global;
+
+/**
+ * The network listener that will receive incoming UDP packets, try to process
+ * them, and then send a reply.
+ */
+public class NetworkHandlerTcp extends NetworkHandlerBase
+{
+
+ private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class );
+
+ /**
+ * UDP socket for sending and receiving.
+ */
+ private final ServerSocket socket;
+
+ private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+
+ /**
+ * Initialize the NetworkHandler by starting threads and opening the socket.
+ */
+ public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException
+ {
+ super( parser );
+ socket = new ServerSocket( port, 10, listenAddress );
+ log.info( "Listening on TCP:" + port );
+ threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
+ }
+
+ public void shutdown()
+ {
+ try {
+ socket.close();
+ } catch ( IOException e ) {
+ }
+ }
+
+ /**
+ * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
+ */
+ @Override
+ public void run()
+ {
+ try {
+ while ( !Global.doShutdown ) {
+ Socket client;
+ try {
+ client = socket.accept();
+ } catch ( IOException e1 ) {
+ log.warn( "ACCEPT fail", e1 );
+ break;
+ }
+ try {
+ threadPool.execute( new ClientTask( client ) );
+ } catch ( RejectedExecutionException e ) {
+ try {
+ client.close();
+ } catch ( IOException e1 ) {
+ }
+ }
+ }
+ } finally {
+ Thread.currentThread().interrupt();
+ Global.doShutdown = true;
+ log.info( "UDP receiver finished." );
+ }
+ }
+
+ private class ClientTask implements Runnable
+ {
+ private final Socket socket;
+ private DataOutputStream writer = null;
+ private DataInputStream reader = null;
+
+ public ClientTask( Socket client )
+ {
+ this.socket = client;
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ try {
+ socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) );
+ reader = new DataInputStream( socket.getInputStream() );
+ writer = new DataOutputStream( socket.getOutputStream() );
+ } catch ( IOException e ) {
+ log.info( "IOException on TCP socket when setting up streams", e );
+ return;
+ }
+ String payload = readMsg( reader );
+ if ( payload == null )
+ return;
+ int i = payload.indexOf( ' ' );
+ if ( i != -1 ) {
+ // For future extensibility we throw away everything after the first space (including the space)
+ payload = payload.substring( 0, i );
+ }
+ if ( !payload.equals( Global.PASSWORD ) ) {
+ sendMsg( "ERROR,Wrong password" );
+ return;
+ }
+ while ( !socket.isClosed() ) {
+ payload = readMsg( reader );
+ if ( payload == null )
+ return;
+ byte[] reply;
+ try {
+ reply = parser.handle( payload );
+ } catch ( Throwable t ) {
+ log.error( "Exception in RequestParser", t );
+ log.error( "Payload was: " + payload );
+ continue;
+ }
+ if ( reply != null ) {
+ sendMsg( reply );
+ }
+ }
+ } catch ( SendException e ) {
+ log.warn( "Cannot send reply to client", e );
+ } finally {
+ try {
+ if ( writer != null ) {
+ writer.flush();
+ writer.close();
+ }
+ socket.close();
+ } catch ( IOException e ) {
+ }
+ }
+ }
+
+ private void sendMsg( String reply ) throws SendException
+ {
+ sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) );
+ }
+
+ private void sendMsg( byte[] reply ) throws SendException
+ {
+ try {
+ writer.writeInt( reply.length );
+ writer.write( reply );
+ } catch ( IOException e ) {
+ throw new SendException();
+ }
+ }
+
+ private String readMsg( DataInputStream reader )
+ {
+ int bytes;
+ try {
+ bytes = reader.readInt();
+ } catch ( IOException e ) {
+ // This should be fine... Client went away
+ return null;
+ }
+ if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) {
+ log.info( "Invalid request size: " + bytes );
+ return null;
+ }
+ if ( bytes == 0 )
+ return ""; // Nothing to read
+ byte[] buffer = new byte[ bytes ];
+ try {
+ reader.readFully( buffer );
+ } catch ( IOException e ) {
+ log.warn( "Client went away when trying to read payload" );
+ return null;
+ }
+ return new String( buffer, StandardCharsets.UTF_8 );
+ }
+
+ }
+
+ private static class SendException extends Exception {}
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java
index 6946cd1..4c12960 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java
@@ -17,12 +17,10 @@ import org.openslx.taskmanager.Global;
* The network listener that will receive incoming UDP packets, try to process
* them, and then send a reply.
*/
-public class NetworkHandler implements Runnable
+public class NetworkHandlerUdp extends NetworkHandlerBase
{
- private static final Logger log = Logger.getLogger( NetworkHandler.class );
-
- // Static part
+ private static final Logger log = Logger.getLogger( NetworkHandlerUdp.class );
private Thread sendThread = null;
/**
@@ -34,16 +32,15 @@ public class NetworkHandler implements Runnable
*/
private final DatagramSocket socket;
- private final RequestParser parser;
-
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
*/
- public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
+ public NetworkHandlerUdp( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
{
+ super(parser);
socket = new DatagramSocket( port, listenAddress );
+ log.info( "Listening on UDP:" + port );
sendThread = new Thread( sender = new Sender() );
- this.parser = parser;
}
public void shutdown()
@@ -100,15 +97,17 @@ public class NetworkHandler implements Runnable
continue;
}
String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
+ byte[] reply;
try {
- byte[] reply = parser.handle( payload );
- if ( reply != null )
- send( packet.getSocketAddress(), reply );
+ reply = parser.handle( payload );
} catch ( Throwable t ) {
log.error( "Exception in RequestParser: " + t.toString() );
log.error( "Payload was: " + payload );
t.printStackTrace();
+ continue;
}
+ if ( reply != null )
+ send( packet.getSocketAddress(), reply );
}
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/Util.java b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java
index bf52ecb..361c0e5 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/util/Util.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java
@@ -20,5 +20,14 @@ public class Util
{
return gsonBuilder.excludeFieldsWithoutExposeAnnotation().create();
}
+
+ public static int parseInt(String str, int def)
+ {
+ try {
+ return Integer.parseInt( str );
+ } catch (Throwable t) {
+ return def;
+ }
+ }
}