summaryrefslogblamecommitdiffstats
path: root/src/main/java/org/openslx/imagemaster/localrpc/NetworkHandler.java
blob: 7223435c70b5cb4f172c7692c18b244552cd5d63 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14













                                                       
                                              

                                                      












































































































                                                                                                                            
                                                 




























































                                                                                                                                               
package org.openslx.imagemaster.localrpc;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.ClientSessionData;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.imagemaster.session.Session;
import org.openslx.imagemaster.session.SessionManager;

import com.google.gson.Gson;

/**
 * The network listener that will receive incoming UDP packets, try to process
 * them, and then send a reply.
 */
public class NetworkHandler implements Runnable
{

	private static final Logger log = Logger.getLogger( NetworkHandler.class );

	private Thread sendThread = null;
	/**
	 * Sender instance (Runnable handling outgoing packets)
	 */
	private final Sender sender;
	/**
	 * UDP socket for sending and receiving.
	 */
	private final DatagramSocket socket;
	/**
	 * Gson class
	 */
	private final Gson gson = new Gson();

	/**
	 * Initialize the NetworkHandler by starting threads and opening the socket.
	 */
	public NetworkHandler( int port, InetAddress listenAddress ) throws SocketException
	{
		socket = new DatagramSocket( port, listenAddress );
		sendThread = new Thread( sender = new Sender() );
	}

	public void shutdown()
	{
		socket.close();
	}

	/**
	 * Prepare and enqueue reply for client request.
	 * Only ever to be called from the receiving thread. The reply message is crafted
	 * and then handed over to the sending thread.
	 * 
	 * @param destination SocketAddress of the client
	 * @param messageId The same ID the client used in it's request.
	 *           It's echoed back to the client to enable request bursts, and has no meaning for the
	 *           server.
	 * @param status A TaskStatus instance to be serialized to json and sent to the client.
	 */
	private void send( SocketAddress destination, byte[] buffer )
	{
		final DatagramPacket packet;
		try {
			packet = new DatagramPacket( buffer, buffer.length, destination );
		} catch ( SocketException e ) {
			log.warn( "Could not construct datagram packet for target " + destination.toString() );
			e.printStackTrace();
			return;
		}
		sender.send( packet );
	}

	/**
	 * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
	 */
	@Override
	public void run()
	{
		byte readBuffer[] = new byte[ 66000 ];
		try {
			sendThread.start();
			while ( !Thread.interrupted() ) {
				DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
				try {
					socket.receive( packet );
				} catch ( IOException e ) {
					log.info( "IOException on UDP socket when reading: " + e.getMessage() );
					Thread.sleep( 100 );
					continue;
				}
				if ( packet.getLength() < 2 ) {
					log.debug( "Message too short" );
					continue;
				}
				String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
				try {
					String reply = handle( payload );
					if ( reply != null )
						send( packet.getSocketAddress(), reply.getBytes( StandardCharsets.UTF_8 ) );
				} catch ( Throwable t ) {
					log.error( "Exception in RequestParser: " + t.toString() );
					log.error( "Payload was: " + payload );
					t.printStackTrace();
				}
			}
		} catch ( InterruptedException e ) {
			Thread.currentThread().interrupt();
		} finally {
			sendThread.interrupt();
			log.info( "UDP receiver finished." );
		}
	}

	private String handle( String payload )
	{
		try {
			JsonUser ju = gson.fromJson( payload, JsonUser.class );
			UserInfo u = ju.toUser();
			if ( u == null ) {
				log.warn( "Invalid or inomplete RPC data (" + payload + ")" );
				return "Invalid or incomplete RPC data";
			}
			ClientSessionData sd = SessionManager.addSession( new Session( u ) );
			return "TOKEN:" + sd.authToken + " SESSIONID:" + sd.sessionId;
		} catch ( Throwable t ) {
			log.error( "Exception on json decode", t );
		}
		return "Json error";
	}

	/**
	 * Private sending thread.
	 * Use blocking queue, wait for packet to be added to it, then try to send.
	 */
	private class Sender implements Runnable
	{

		/**
		 * Queue to stuff outgoing packets into.
		 */
		private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 );

		/**
		 * Wait until something is put into the queue, then send it.
		 */
		@Override
		public void run()
		{
			try {
				while ( !Thread.interrupted() ) {
					final DatagramPacket packet;
					packet = queue.take();
					try {
						socket.send( packet );
					} catch ( IOException e ) {
						log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() );
					}
				}
			} catch ( InterruptedException e ) {
				Thread.currentThread().interrupt();
			} finally {
				log.info( "UDP sender finished." );
			}
		}

		/**
		 * Add something to the outgoing packet queue.
		 * Called from the receiving thread.
		 */
		public void send( DatagramPacket packet )
		{
			if ( queue.offer( packet ) )
				return;
			log.warn( "Could not add packet to queue: Full" );
		}

	}

}