diff options
author | Simon Rettberg | 2014-06-03 16:44:56 +0200 |
---|---|---|
committer | Simon Rettberg | 2014-06-03 16:44:56 +0200 |
commit | efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f (patch) | |
tree | ab7310095194cea2a74acd8aac782b1ec95e508d /daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java | |
download | taskman-lite-efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f.tar.gz taskman-lite-efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f.tar.xz taskman-lite-efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f.zip |
Initial commit
Diffstat (limited to 'daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java')
-rw-r--r-- | daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java new file mode 100644 index 0000000..3e2c8fd --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java @@ -0,0 +1,180 @@ +package org.openslx.taskmanager.network; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandler implements Runnable +{ + + private static final Logger log = Logger.getLogger( NetworkHandler.class ); + + // Static part + + private static Thread recvThread = null; + private static Thread sendThread = null; + /** + * Sender instance (Runnable handling outgoing packets) + */ + private static Sender sender = null; + /** + * UDP socket for sending and receiving. + */ + private static DatagramSocket socket; + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public static void init() throws SocketException + { + if ( recvThread != null ) + throw new RuntimeException( "Already initialized" ); + socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS ); + recvThread = new Thread( new NetworkHandler() ); + recvThread.start(); + sendThread = new Thread( sender = new Sender() ); + sendThread.start(); + } + + public static void shutdown() + { + socket.close(); + } + + public static void join() + { + try { + recvThread.join(); + sendThread.join(); + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } + } + + // Class part + + /** + * Prepare and enqueue reply for client request. + * Only ever to be called from the receiving thread. The reply message is crafted + * and then handed over to the sending thread. + * + * @param destination SocketAddress of the client + * @param messageId The same ID the client used in it's request. + * It's echoed back to the client to enable request bursts, and has no meaning for the + * server. + * @param status A TaskStatus instance to be serialized to json and sent to the client. + */ + private void send( SocketAddress destination, byte[] buffer ) + { + final DatagramPacket packet; + try { + packet = new DatagramPacket( buffer, buffer.length, destination ); + } catch ( SocketException e ) { + log.warn( "Could not construct datagram packet for target " + destination.toString() ); + e.printStackTrace(); + return; + } + sender.send( packet ); + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + byte readBuffer[] = new byte[ 66000 ]; + try { + while ( !Global.doShutdown ) { + DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); + try { + socket.receive( packet ); + } catch ( IOException e ) { + log.info( "IOException on UDP socket when reading: " + e.getMessage() ); + Thread.sleep( 100 ); + continue; + } + if ( packet.getLength() < 2 ) { + log.debug( "Message too short" ); + continue; + } + String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); + try { + byte[] reply = RequestParser.handle( payload ); + if ( reply != null ) + send( packet.getSocketAddress(), reply ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser: " + t.getMessage() ); + t.printStackTrace(); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP receiver finished." ); + } + } + + /** + * Private sending thread. + * Use blocking queue, wait for packet to be added to it, then try to send. + */ + static class Sender implements Runnable + { + + /** + * Queue to stuff outgoing packets into. + */ + private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 ); + + /** + * Wait until something is put into the queue, then send it. + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + final DatagramPacket packet; + packet = queue.take(); + try { + socket.send( packet ); + } catch ( IOException e ) { + log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP sender finished." ); + } + } + + /** + * Add something to the outgoing packet queue. + * Called from the receiving thread. + */ + public void send( DatagramPacket packet ) + { + if ( queue.offer( packet ) ) + return; + log.warn( "Could not add packet to queue: Full" ); + } + + } + +} |