summaryrefslogtreecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java')
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java170
1 files changed, 170 insertions, 0 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java
new file mode 100644
index 0000000..4c12960
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java
@@ -0,0 +1,170 @@
+package org.openslx.taskmanager.network;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.Global;
+
+/**
+ * The network listener that will receive incoming UDP packets, try to process
+ * them, and then send a reply.
+ */
+public class NetworkHandlerUdp extends NetworkHandlerBase
+{
+
+ private static final Logger log = Logger.getLogger( NetworkHandlerUdp.class );
+
+ private Thread sendThread = null;
+ /**
+ * Sender instance (Runnable handling outgoing packets)
+ */
+ private final Sender sender;
+ /**
+ * UDP socket for sending and receiving.
+ */
+ private final DatagramSocket socket;
+
+ /**
+ * Initialize the NetworkHandler by starting threads and opening the socket.
+ */
+ public NetworkHandlerUdp( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
+ {
+ super(parser);
+ socket = new DatagramSocket( port, listenAddress );
+ log.info( "Listening on UDP:" + port );
+ sendThread = new Thread( sender = new Sender() );
+ }
+
+ public void shutdown()
+ {
+ socket.close();
+ }
+
+ // Class part
+
+ /**
+ * Prepare and enqueue reply for client request.
+ * Only ever to be called from the receiving thread. The reply message is crafted
+ * and then handed over to the sending thread.
+ *
+ * @param destination SocketAddress of the client
+ * @param messageId The same ID the client used in it's request.
+ * It's echoed back to the client to enable request bursts, and has no meaning for the
+ * server.
+ * @param status A TaskStatus instance to be serialized to json and sent to the client.
+ */
+ private void send( SocketAddress destination, byte[] buffer )
+ {
+ final DatagramPacket packet;
+ try {
+ packet = new DatagramPacket( buffer, buffer.length, destination );
+ } catch ( Exception e ) {
+ log.warn( "Could not construct datagram packet for target " + destination.toString() );
+ e.printStackTrace();
+ return;
+ }
+ sender.send( packet );
+ }
+
+ /**
+ * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
+ */
+ @Override
+ public void run()
+ {
+ byte readBuffer[] = new byte[ 66000 ];
+ try {
+ sendThread.start();
+ while ( !Global.doShutdown ) {
+ DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
+ try {
+ socket.receive( packet );
+ } catch ( IOException e ) {
+ log.info( "IOException on UDP socket when reading: " + e.getMessage() );
+ Thread.sleep( 100 );
+ continue;
+ }
+ if ( packet.getLength() < 2 ) {
+ log.debug( "Message too short" );
+ continue;
+ }
+ String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
+ byte[] reply;
+ try {
+ reply = parser.handle( payload );
+ } catch ( Throwable t ) {
+ log.error( "Exception in RequestParser: " + t.toString() );
+ log.error( "Payload was: " + payload );
+ t.printStackTrace();
+ continue;
+ }
+ if ( reply != null )
+ send( packet.getSocketAddress(), reply );
+ }
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ } finally {
+ Global.doShutdown = true;
+ sendThread.interrupt();
+ log.info( "UDP receiver finished." );
+ }
+ }
+
+ /**
+ * Private sending thread.
+ * Use blocking queue, wait for packet to be added to it, then try to send.
+ */
+ private class Sender implements Runnable
+ {
+
+ /**
+ * Queue to stuff outgoing packets into.
+ */
+ private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 );
+
+ /**
+ * Wait until something is put into the queue, then send it.
+ */
+ @Override
+ public void run()
+ {
+ try {
+ while ( !Global.doShutdown ) {
+ final DatagramPacket packet;
+ packet = queue.take();
+ try {
+ socket.send( packet );
+ } catch ( IOException e ) {
+ log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString(), e );
+ }
+ }
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ } finally {
+ Global.doShutdown = true;
+ log.info( "UDP sender finished." );
+ }
+ }
+
+ /**
+ * Add something to the outgoing packet queue.
+ * Called from the receiving thread.
+ */
+ public void send( DatagramPacket packet )
+ {
+ if ( queue.offer( packet ) )
+ return;
+ log.warn( "Could not add packet to queue: Full" );
+ }
+
+ }
+
+}