summaryrefslogblamecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
blob: e37b3d73d282e7347ff7f0233bdbb44a2dc7b9f5 (plain) (tree)
































































































































































































                                                                                                                                            
package org.openslx.taskmanager.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;

/**
 * The network listener that will receive incoming UDP packets, try to process
 * them, and then send a reply.
 */
public class NetworkHandlerTcp extends NetworkHandlerBase
{

	private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class );

	/**
	 * UDP socket for sending and receiving.
	 */
	private final ServerSocket socket;

	private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );

	/**
	 * Initialize the NetworkHandler by starting threads and opening the socket.
	 */
	public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException
	{
		super( parser );
		socket = new ServerSocket( port, 10, listenAddress );
		log.info( "Listening on TCP:" + port );
		threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
	}

	public void shutdown()
	{
		try {
			socket.close();
		} catch ( IOException e ) {
		}
	}

	/**
	 * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
	 */
	@Override
	public void run()
	{
		try {
			while ( !Global.doShutdown ) {
				Socket client;
				try {
					client = socket.accept();
				} catch ( IOException e1 ) {
					log.warn( "ACCEPT fail", e1 );
					break;
				}
				try {
					threadPool.execute( new ClientTask( client ) );
				} catch ( RejectedExecutionException e ) {
					try {
						client.close();
					} catch ( IOException e1 ) {
					}
				}
			}
		} finally {
			Thread.currentThread().interrupt();
			Global.doShutdown = true;
			log.info( "UDP receiver finished." );
		}
	}

	private class ClientTask implements Runnable
	{
		private final Socket socket;
		private DataOutputStream writer = null;
		private DataInputStream reader = null;
			
		public ClientTask( Socket client )
		{
			this.socket = client;
		}

		@Override
		public void run()
		{
			try {
				try {
					socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) );
					reader = new DataInputStream( socket.getInputStream() );
					writer = new DataOutputStream( socket.getOutputStream() );
				} catch ( IOException e ) {
					log.info( "IOException on TCP socket when setting up streams", e );
					return;
				}
				String payload = readMsg( reader );
				if ( payload == null )
					return;
				int i = payload.indexOf( ' ' );
				if ( i != -1 ) {
					// For future extensibility we throw away everything after the first space (including the space)
					payload = payload.substring( 0, i );
				}
				if ( !payload.equals( Global.PASSWORD ) ) {
					sendMsg( "ERROR,Wrong password" );
					return;
				}
				while ( !socket.isClosed() ) {
					payload = readMsg( reader );
					if ( payload == null )
						return;
					byte[] reply;
					try {
						reply = parser.handle( payload );
					} catch ( Throwable t ) {
						log.error( "Exception in RequestParser", t );
						log.error( "Payload was: " + payload );
						continue;
					}
					if ( reply != null ) {
						sendMsg( reply );
					}
				}
			} catch ( SendException e ) {
				log.warn( "Cannot send reply to client", e );
			} finally {
				try {
					if ( writer != null ) {
						writer.flush();
						writer.close();
					}
					socket.close();
				} catch ( IOException e ) {
				}
			}
		}
		
		private void sendMsg( String reply ) throws SendException
		{
			sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) );
		}

		private void sendMsg( byte[] reply ) throws SendException
		{
			try {
				writer.writeInt( reply.length );
				writer.write( reply );
			} catch ( IOException e ) {
				throw new SendException();
			}
		}

		private String readMsg( DataInputStream reader )
		{
			int bytes;
			try {
				bytes = reader.readInt();
			} catch ( IOException e ) {
				// This should be fine... Client went away
				return null;
			}
			if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) {
				log.info( "Invalid request size: " + bytes );
				return null;
			}
			if ( bytes == 0 )
				return ""; // Nothing to read
			byte[] buffer = new byte[ bytes ];
			try {
				reader.readFully( buffer );
			} catch ( IOException e ) {
				log.warn( "Client went away when trying to read payload" );
				return null;
			}
			return new String( buffer, StandardCharsets.UTF_8 );
		} 

	}
	
	private static class SendException extends Exception {}

}