From 49a993786c6435cc17780241dd205a2ca1a818a2 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 7 Jan 2020 15:34:57 +0100 Subject: Add TCP interface Supports password protection --- .../taskmanager/network/NetworkHandlerUdp.java | 170 +++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java (limited to 'daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java') diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java new file mode 100644 index 0000000..4c12960 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java @@ -0,0 +1,170 @@ +package org.openslx.taskmanager.network; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandlerUdp extends NetworkHandlerBase +{ + + private static final Logger log = Logger.getLogger( NetworkHandlerUdp.class ); + + private Thread sendThread = null; + /** + * Sender instance (Runnable handling outgoing packets) + */ + private final Sender sender; + /** + * UDP socket for sending and receiving. + */ + private final DatagramSocket socket; + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public NetworkHandlerUdp( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException + { + super(parser); + socket = new DatagramSocket( port, listenAddress ); + log.info( "Listening on UDP:" + port ); + sendThread = new Thread( sender = new Sender() ); + } + + public void shutdown() + { + socket.close(); + } + + // Class part + + /** + * Prepare and enqueue reply for client request. + * Only ever to be called from the receiving thread. The reply message is crafted + * and then handed over to the sending thread. + * + * @param destination SocketAddress of the client + * @param messageId The same ID the client used in it's request. + * It's echoed back to the client to enable request bursts, and has no meaning for the + * server. + * @param status A TaskStatus instance to be serialized to json and sent to the client. + */ + private void send( SocketAddress destination, byte[] buffer ) + { + final DatagramPacket packet; + try { + packet = new DatagramPacket( buffer, buffer.length, destination ); + } catch ( Exception e ) { + log.warn( "Could not construct datagram packet for target " + destination.toString() ); + e.printStackTrace(); + return; + } + sender.send( packet ); + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + byte readBuffer[] = new byte[ 66000 ]; + try { + sendThread.start(); + while ( !Global.doShutdown ) { + DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); + try { + socket.receive( packet ); + } catch ( IOException e ) { + log.info( "IOException on UDP socket when reading: " + e.getMessage() ); + Thread.sleep( 100 ); + continue; + } + if ( packet.getLength() < 2 ) { + log.debug( "Message too short" ); + continue; + } + String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); + byte[] reply; + try { + reply = parser.handle( payload ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser: " + t.toString() ); + log.error( "Payload was: " + payload ); + t.printStackTrace(); + continue; + } + if ( reply != null ) + send( packet.getSocketAddress(), reply ); + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + sendThread.interrupt(); + log.info( "UDP receiver finished." ); + } + } + + /** + * Private sending thread. + * Use blocking queue, wait for packet to be added to it, then try to send. + */ + private class Sender implements Runnable + { + + /** + * Queue to stuff outgoing packets into. + */ + private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 ); + + /** + * Wait until something is put into the queue, then send it. + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + final DatagramPacket packet; + packet = queue.take(); + try { + socket.send( packet ); + } catch ( IOException e ) { + log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString(), e ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP sender finished." ); + } + } + + /** + * Add something to the outgoing packet queue. + * Called from the receiving thread. + */ + public void send( DatagramPacket packet ) + { + if ( queue.offer( packet ) ) + return; + log.warn( "Could not add packet to queue: Full" ); + } + + } + +} -- cgit v1.2.3-55-g7522