summaryrefslogblamecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java
blob: 4c1296015b7c6fe8496e111891538bbbe54aec46 (plain) (tree)
1
2
3
4
5
6




                                        
                            












                                                                              
                                                         

 
                                                                                      
 
                                         


                                                               
                                    


                                                

                                            


                                                                                    
                                                                                                                    
         
                              
                                                                   
                                                       
                                                                 

         
                              



                               

















                                                                                                        
                                         














                                                                                                               
                                           













                                                                                                                         
                                             
                                     
                                                                         
                                                         

                                                                                                   
                                                            
                                                 
                                 

                                                                                 




                                                           
                                               







                                                                                   
                                                



















                                                                                                     
                                                                                                                                                  























                                                                          
package org.openslx.taskmanager.network;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;

/**
 * The network listener that will receive incoming UDP packets, try to process
 * them, and then send a reply.
 */
public class NetworkHandlerUdp extends NetworkHandlerBase
{

	private static final Logger log = Logger.getLogger( NetworkHandlerUdp.class );

	private Thread sendThread = null;
	/**
	 * Sender instance (Runnable handling outgoing packets)
	 */
	private final Sender sender;
	/**
	 * UDP socket for sending and receiving.
	 */
	private final DatagramSocket socket;

	/**
	 * Initialize the NetworkHandler by starting threads and opening the socket.
	 */
	public NetworkHandlerUdp( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
	{
		super(parser);
		socket = new DatagramSocket( port, listenAddress );
		log.info( "Listening on UDP:" + port );
		sendThread = new Thread( sender = new Sender() );
	}

	public void shutdown()
	{
		socket.close();
	}

	// Class part

	/**
	 * Prepare and enqueue reply for client request.
	 * Only ever to be called from the receiving thread. The reply message is crafted
	 * and then handed over to the sending thread.
	 * 
	 * @param destination SocketAddress of the client
	 * @param messageId The same ID the client used in it's request.
	 *           It's echoed back to the client to enable request bursts, and has no meaning for the
	 *           server.
	 * @param status A TaskStatus instance to be serialized to json and sent to the client.
	 */
	private void send( SocketAddress destination, byte[] buffer )
	{
		final DatagramPacket packet;
		try {
			packet = new DatagramPacket( buffer, buffer.length, destination );
		} catch ( Exception e ) {
			log.warn( "Could not construct datagram packet for target " + destination.toString() );
			e.printStackTrace();
			return;
		}
		sender.send( packet );
	}

	/**
	 * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
	 */
	@Override
	public void run()
	{
		byte readBuffer[] = new byte[ 66000 ];
		try {
			sendThread.start();
			while ( !Global.doShutdown ) {
				DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
				try {
					socket.receive( packet );
				} catch ( IOException e ) {
					log.info( "IOException on UDP socket when reading: " + e.getMessage() );
					Thread.sleep( 100 );
					continue;
				}
				if ( packet.getLength() < 2 ) {
					log.debug( "Message too short" );
					continue;
				}
				String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
				byte[] reply;
				try {
					reply = parser.handle( payload );
				} catch ( Throwable t ) {
					log.error( "Exception in RequestParser: " + t.toString() );
					log.error( "Payload was: " + payload );
					t.printStackTrace();
					continue;
				}
				if ( reply != null )
					send( packet.getSocketAddress(), reply );
			}
		} catch ( InterruptedException e ) {
			Thread.currentThread().interrupt();
		} finally {
			Global.doShutdown = true;
			sendThread.interrupt();
			log.info( "UDP receiver finished." );
		}
	}

	/**
	 * Private sending thread.
	 * Use blocking queue, wait for packet to be added to it, then try to send.
	 */
	private class Sender implements Runnable
	{

		/**
		 * Queue to stuff outgoing packets into.
		 */
		private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 );

		/**
		 * Wait until something is put into the queue, then send it.
		 */
		@Override
		public void run()
		{
			try {
				while ( !Global.doShutdown ) {
					final DatagramPacket packet;
					packet = queue.take();
					try {
						socket.send( packet );
					} catch ( IOException e ) {
						log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString(), e );
					}
				}
			} catch ( InterruptedException e ) {
				Thread.currentThread().interrupt();
			} finally {
				Global.doShutdown = true;
				log.info( "UDP sender finished." );
			}
		}

		/**
		 * Add something to the outgoing packet queue.
		 * Called from the receiving thread.
		 */
		public void send( DatagramPacket packet )
		{
			if ( queue.offer( packet ) )
				return;
			log.warn( "Could not add packet to queue: Full" );
		}

	}

}