package org.openslx.taskmanager.network; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketAddress; import java.net.SocketException; import java.nio.charset.StandardCharsets; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; /** * The network listener that will receive incoming UDP packets, try to process * them, and then send a reply. */ public class NetworkHandlerUdp extends NetworkHandlerBase { private static final Logger log = Logger.getLogger( NetworkHandlerUdp.class ); private Thread sendThread = null; /** * Sender instance (Runnable handling outgoing packets) */ private final Sender sender; /** * UDP socket for sending and receiving. */ private final DatagramSocket socket; /** * Initialize the NetworkHandler by starting threads and opening the socket. */ public NetworkHandlerUdp( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException { super(parser); socket = new DatagramSocket( port, listenAddress ); log.info( "Listening on UDP:" + port ); sendThread = new Thread( sender = new Sender() ); } public void shutdown() { socket.close(); } // Class part /** * Prepare and enqueue reply for client request. * Only ever to be called from the receiving thread. The reply message is crafted * and then handed over to the sending thread. * * @param destination SocketAddress of the client * @param messageId The same ID the client used in it's request. * It's echoed back to the client to enable request bursts, and has no meaning for the * server. * @param status A TaskStatus instance to be serialized to json and sent to the client. */ private void send( SocketAddress destination, byte[] buffer ) { final DatagramPacket packet; try { packet = new DatagramPacket( buffer, buffer.length, destination ); } catch ( Exception e ) { log.warn( "Could not construct datagram packet for target " + destination.toString() ); e.printStackTrace(); return; } sender.send( packet ); } /** * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode */ @Override public void run() { byte readBuffer[] = new byte[ 66000 ]; try { sendThread.start(); while ( !Global.doShutdown ) { DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); try { socket.receive( packet ); } catch ( IOException e ) { log.info( "IOException on UDP socket when reading: " + e.getMessage() ); Thread.sleep( 100 ); continue; } if ( packet.getLength() < 2 ) { log.debug( "Message too short" ); continue; } String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); byte[] reply; try { reply = parser.handle( payload ); } catch ( Throwable t ) { log.error( "Exception in RequestParser: " + t.toString() ); log.error( "Payload was: " + payload ); t.printStackTrace(); continue; } if ( reply != null ) send( packet.getSocketAddress(), reply ); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } finally { Global.doShutdown = true; sendThread.interrupt(); log.info( "UDP receiver finished." ); } } /** * Private sending thread. * Use blocking queue, wait for packet to be added to it, then try to send. */ private class Sender implements Runnable { /** * Queue to stuff outgoing packets into. */ private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 ); /** * Wait until something is put into the queue, then send it. */ @Override public void run() { try { while ( !Global.doShutdown ) { final DatagramPacket packet; packet = queue.take(); try { socket.send( packet ); } catch ( IOException e ) { log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString(), e ); } } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } finally { Global.doShutdown = true; log.info( "UDP sender finished." ); } } /** * Add something to the outgoing packet queue. * Called from the receiving thread. */ public void send( DatagramPacket packet ) { if ( queue.offer( packet ) ) return; log.warn( "Could not add packet to queue: Full" ); } } }