summaryrefslogblamecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
blob: 3e2c8fd747c5b2ce457979b342ce98a59c20c488 (plain) (tree)



















































































































































































                                                                                                                                               
package org.openslx.taskmanager.network;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;

/**
 * The network listener that will receive incoming UDP packets, try to process
 * them, and then send a reply.
 */
public class NetworkHandler implements Runnable
{

	private static final Logger log = Logger.getLogger( NetworkHandler.class );

	// Static part

	private static Thread recvThread = null;
	private static Thread sendThread = null;
	/**
	 * Sender instance (Runnable handling outgoing packets)
	 */
	private static Sender sender = null;
	/**
	 * UDP socket for sending and receiving.
	 */
	private static DatagramSocket socket;

	/**
	 * Initialize the NetworkHandler by starting threads and opening the socket.
	 */
	public static void init() throws SocketException
	{
		if ( recvThread != null )
			throw new RuntimeException( "Already initialized" );
		socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS );
		recvThread = new Thread( new NetworkHandler() );
		recvThread.start();
		sendThread = new Thread( sender = new Sender() );
		sendThread.start();
	}

	public static void shutdown()
	{
		socket.close();
	}

	public static void join()
	{
		try {
			recvThread.join();
			sendThread.join();
		} catch ( InterruptedException e ) {
			Thread.currentThread().interrupt();
		}
	}

	// Class part

	/**
	 * Prepare and enqueue reply for client request.
	 * Only ever to be called from the receiving thread. The reply message is crafted
	 * and then handed over to the sending thread.
	 * 
	 * @param destination SocketAddress of the client
	 * @param messageId The same ID the client used in it's request.
	 *           It's echoed back to the client to enable request bursts, and has no meaning for the
	 *           server.
	 * @param status A TaskStatus instance to be serialized to json and sent to the client.
	 */
	private void send( SocketAddress destination, byte[] buffer )
	{
		final DatagramPacket packet;
		try {
			packet = new DatagramPacket( buffer, buffer.length, destination );
		} catch ( SocketException e ) {
			log.warn( "Could not construct datagram packet for target " + destination.toString() );
			e.printStackTrace();
			return;
		}
		sender.send( packet );
	}

	/**
	 * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
	 */
	@Override
	public void run()
	{
		byte readBuffer[] = new byte[ 66000 ];
		try {
			while ( !Global.doShutdown ) {
				DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
				try {
					socket.receive( packet );
				} catch ( IOException e ) {
					log.info( "IOException on UDP socket when reading: " + e.getMessage() );
					Thread.sleep( 100 );
					continue;
				}
				if ( packet.getLength() < 2 ) {
					log.debug( "Message too short" );
					continue;
				}
				String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
				try {
					byte[] reply = RequestParser.handle( payload );
					if ( reply != null )
						send( packet.getSocketAddress(), reply );
				} catch ( Throwable t ) {
					log.error( "Exception in RequestParser: " + t.getMessage() );
					t.printStackTrace();
				}
			}
		} catch ( InterruptedException e ) {
			Thread.currentThread().interrupt();
		} finally {
			Global.doShutdown = true;
			log.info( "UDP receiver finished." );
		}
	}

	/**
	 * Private sending thread.
	 * Use blocking queue, wait for packet to be added to it, then try to send.
	 */
	static class Sender implements Runnable
	{

		/**
		 * Queue to stuff outgoing packets into.
		 */
		private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 );

		/**
		 * Wait until something is put into the queue, then send it.
		 */
		@Override
		public void run()
		{
			try {
				while ( !Global.doShutdown ) {
					final DatagramPacket packet;
					packet = queue.take();
					try {
						socket.send( packet );
					} catch ( IOException e ) {
						log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() );
					}
				}
			} catch ( InterruptedException e ) {
				Thread.currentThread().interrupt();
			} finally {
				Global.doShutdown = true;
				log.info( "UDP sender finished." );
			}
		}

		/**
		 * Add something to the outgoing packet queue.
		 * Called from the receiving thread.
		 */
		public void send( DatagramPacket packet )
		{
			if ( queue.offer( packet ) )
				return;
			log.warn( "Could not add packet to queue: Full" );
		}

	}

}