summaryrefslogblamecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
blob: 8b8b6d56451ee6d12b885af2c0424927b0378831 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                                                       
                                          

                                               
                                                 

















                                                                                      








                                                                                                                                                                 














































                                                                                                                
                                                             







                                                       
 









                                                  























                                                                                                                                        
                                                                                       





















                                                                                             
 







                                                                            
                                                                                            










                                                                
                                                                                            












                                                                             
                                                                                           





                                                                                           
                 

         



                                                            

 
package org.openslx.taskmanager.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;

/**
 * The network listener that will receive incoming UDP packets, try to process
 * them, and then send a reply.
 */
public class NetworkHandlerTcp extends NetworkHandlerBase
{

	private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class );

	/**
	 * UDP socket for sending and receiving.
	 */
	private final ServerSocket socket;

	private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 0, 64, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new ThreadFactory() {
		AtomicInteger id = new AtomicInteger();

		@Override
		public Thread newThread( Runnable r )
		{
			return new Thread( r, "TCP-" + id.incrementAndGet() );
		}
	} );

	/**
	 * Initialize the NetworkHandler by starting threads and opening the socket.
	 */
	public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException
	{
		super( parser );
		socket = new ServerSocket( port, 10, listenAddress );
		log.info( "Listening on TCP:" + port );
		threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
	}

	public void shutdown()
	{
		try {
			socket.close();
		} catch ( IOException e ) {
		}
	}

	/**
	 * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
	 */
	@Override
	public void run()
	{
		try {
			while ( !Global.doShutdown ) {
				Socket client;
				try {
					client = socket.accept();
				} catch ( IOException e1 ) {
					log.warn( "ACCEPT fail", e1 );
					break;
				}
				try {
					threadPool.execute( new ClientTask( client ) );
				} catch ( RejectedExecutionException e ) {
					try {
						client.close();
					} catch ( IOException e1 ) {
					}
				}
			}
		} finally {
			Thread.currentThread().interrupt();
			Global.doShutdown = true;
			log.info( "TCP receiver finished." );
		}
	}

	private class ClientTask implements Runnable
	{
		private final Socket socket;
		private DataOutputStream writer = null;
		private DataInputStream reader = null;

		public ClientTask( Socket client )
		{
			this.socket = client;
		}

		@Override
		public void run()
		{
			try {
				try {
					reader = new DataInputStream( socket.getInputStream() );
					writer = new DataOutputStream( socket.getOutputStream() );
				} catch ( IOException e ) {
					log.info( "IOException on TCP socket when setting up streams", e );
					return;
				}
				String payload = readMsg( reader );
				if ( payload == null )
					return;
				int i = payload.indexOf( ' ' );
				if ( i != -1 ) {
					// For future extensibility we throw away everything after the first space (including the space)
					payload = payload.substring( 0, i );
				}
				if ( !payload.equals( Global.PASSWORD ) ) {
					sendMsg( "ERROR,Wrong password" );
					return;
				}
				while ( !socket.isClosed() ) {
					payload = readMsg( reader );
					if ( payload == null )
						return;
					byte[] reply;
					try {
						reply = parser.handle( payload, 8000 );
					} catch ( Throwable t ) {
						log.error( "Exception in RequestParser", t );
						log.error( "Payload was: " + payload );
						continue;
					}
					if ( reply != null ) {
						sendMsg( reply );
					}
				}
			} catch ( SendException e ) {
				log.warn( "Cannot send reply to client", e );
			} finally {
				try {
					if ( writer != null ) {
						writer.flush();
						writer.close();
					}
					socket.close();
				} catch ( IOException e ) {
				}
			}
		}

		private void sendMsg( String reply ) throws SendException
		{
			sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) );
		}

		private void sendMsg( byte[] reply ) throws SendException
		{
			try {
				socket.setSoTimeout( (int)TimeUnit.SECONDS.toMillis( 15 ) );
				writer.writeInt( reply.length );
				writer.write( reply );
			} catch ( IOException e ) {
				throw new SendException();
			}
		}

		private String readMsg( DataInputStream reader )
		{
			int bytes;
			try {
				socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) );
				bytes = reader.readInt();
			} catch ( IOException e ) {
				// This should be fine... Client went away
				return null;
			}
			if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) {
				log.info( "Invalid request size: " + bytes );
				return null;
			}
			if ( bytes == 0 )
				return ""; // Nothing to read
			byte[] buffer = new byte[ bytes ];
			try {
				socket.setSoTimeout( (int)TimeUnit.SECONDS.toMillis( 2 ) );
				reader.readFully( buffer );
			} catch ( IOException e ) {
				log.warn( "Client went away when trying to read payload" );
				return null;
			}
			return new String( buffer, StandardCharsets.UTF_8 );
		}

	}

	private static class SendException extends Exception
	{
	}

}