From 49a993786c6435cc17780241dd205a2ca1a818a2 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 7 Jan 2020 15:34:57 +0100 Subject: Add TCP interface Supports password protection --- .../taskmanager/network/NetworkHandlerTcp.java | 193 +++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java (limited to 'daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java') diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java new file mode 100644 index 0000000..e37b3d7 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java @@ -0,0 +1,193 @@ +package org.openslx.taskmanager.network; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandlerTcp extends NetworkHandlerBase +{ + + private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class ); + + /** + * UDP socket for sending and receiving. + */ + private final ServerSocket socket; + + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue() ); + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException + { + super( parser ); + socket = new ServerSocket( port, 10, listenAddress ); + log.info( "Listening on TCP:" + port ); + threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() ); + } + + public void shutdown() + { + try { + socket.close(); + } catch ( IOException e ) { + } + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + Socket client; + try { + client = socket.accept(); + } catch ( IOException e1 ) { + log.warn( "ACCEPT fail", e1 ); + break; + } + try { + threadPool.execute( new ClientTask( client ) ); + } catch ( RejectedExecutionException e ) { + try { + client.close(); + } catch ( IOException e1 ) { + } + } + } + } finally { + Thread.currentThread().interrupt(); + Global.doShutdown = true; + log.info( "UDP receiver finished." ); + } + } + + private class ClientTask implements Runnable + { + private final Socket socket; + private DataOutputStream writer = null; + private DataInputStream reader = null; + + public ClientTask( Socket client ) + { + this.socket = client; + } + + @Override + public void run() + { + try { + try { + socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) ); + reader = new DataInputStream( socket.getInputStream() ); + writer = new DataOutputStream( socket.getOutputStream() ); + } catch ( IOException e ) { + log.info( "IOException on TCP socket when setting up streams", e ); + return; + } + String payload = readMsg( reader ); + if ( payload == null ) + return; + int i = payload.indexOf( ' ' ); + if ( i != -1 ) { + // For future extensibility we throw away everything after the first space (including the space) + payload = payload.substring( 0, i ); + } + if ( !payload.equals( Global.PASSWORD ) ) { + sendMsg( "ERROR,Wrong password" ); + return; + } + while ( !socket.isClosed() ) { + payload = readMsg( reader ); + if ( payload == null ) + return; + byte[] reply; + try { + reply = parser.handle( payload ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser", t ); + log.error( "Payload was: " + payload ); + continue; + } + if ( reply != null ) { + sendMsg( reply ); + } + } + } catch ( SendException e ) { + log.warn( "Cannot send reply to client", e ); + } finally { + try { + if ( writer != null ) { + writer.flush(); + writer.close(); + } + socket.close(); + } catch ( IOException e ) { + } + } + } + + private void sendMsg( String reply ) throws SendException + { + sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) ); + } + + private void sendMsg( byte[] reply ) throws SendException + { + try { + writer.writeInt( reply.length ); + writer.write( reply ); + } catch ( IOException e ) { + throw new SendException(); + } + } + + private String readMsg( DataInputStream reader ) + { + int bytes; + try { + bytes = reader.readInt(); + } catch ( IOException e ) { + // This should be fine... Client went away + return null; + } + if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) { + log.info( "Invalid request size: " + bytes ); + return null; + } + if ( bytes == 0 ) + return ""; // Nothing to read + byte[] buffer = new byte[ bytes ]; + try { + reader.readFully( buffer ); + } catch ( IOException e ) { + log.warn( "Client went away when trying to read payload" ); + return null; + } + return new String( buffer, StandardCharsets.UTF_8 ); + } + + } + + private static class SendException extends Exception {} + +} -- cgit v1.2.3-55-g7522