package org.openslx.taskmanager.network; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; /** * The network listener that will receive incoming UDP packets, try to process * them, and then send a reply. */ public class NetworkHandlerTcp extends NetworkHandlerBase { private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class ); /** * UDP socket for sending and receiving. */ private final ServerSocket socket; private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue() ); /** * Initialize the NetworkHandler by starting threads and opening the socket. */ public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException { super( parser ); socket = new ServerSocket( port, 10, listenAddress ); log.info( "Listening on TCP:" + port ); threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() ); } public void shutdown() { try { socket.close(); } catch ( IOException e ) { } } /** * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode */ @Override public void run() { try { while ( !Global.doShutdown ) { Socket client; try { client = socket.accept(); } catch ( IOException e1 ) { log.warn( "ACCEPT fail", e1 ); break; } try { threadPool.execute( new ClientTask( client ) ); } catch ( RejectedExecutionException e ) { try { client.close(); } catch ( IOException e1 ) { } } } } finally { Thread.currentThread().interrupt(); Global.doShutdown = true; log.info( "UDP receiver finished." ); } } private class ClientTask implements Runnable { private final Socket socket; private DataOutputStream writer = null; private DataInputStream reader = null; public ClientTask( Socket client ) { this.socket = client; } @Override public void run() { try { try { socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) ); reader = new DataInputStream( socket.getInputStream() ); writer = new DataOutputStream( socket.getOutputStream() ); } catch ( IOException e ) { log.info( "IOException on TCP socket when setting up streams", e ); return; } String payload = readMsg( reader ); if ( payload == null ) return; int i = payload.indexOf( ' ' ); if ( i != -1 ) { // For future extensibility we throw away everything after the first space (including the space) payload = payload.substring( 0, i ); } if ( !payload.equals( Global.PASSWORD ) ) { sendMsg( "ERROR,Wrong password" ); return; } while ( !socket.isClosed() ) { payload = readMsg( reader ); if ( payload == null ) return; byte[] reply; try { reply = parser.handle( payload ); } catch ( Throwable t ) { log.error( "Exception in RequestParser", t ); log.error( "Payload was: " + payload ); continue; } if ( reply != null ) { sendMsg( reply ); } } } catch ( SendException e ) { log.warn( "Cannot send reply to client", e ); } finally { try { if ( writer != null ) { writer.flush(); writer.close(); } socket.close(); } catch ( IOException e ) { } } } private void sendMsg( String reply ) throws SendException { sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) ); } private void sendMsg( byte[] reply ) throws SendException { try { writer.writeInt( reply.length ); writer.write( reply ); } catch ( IOException e ) { throw new SendException(); } } private String readMsg( DataInputStream reader ) { int bytes; try { bytes = reader.readInt(); } catch ( IOException e ) { // This should be fine... Client went away return null; } if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) { log.info( "Invalid request size: " + bytes ); return null; } if ( bytes == 0 ) return ""; // Nothing to read byte[] buffer = new byte[ bytes ]; try { reader.readFully( buffer ); } catch ( IOException e ) { log.warn( "Client went away when trying to read payload" ); return null; } return new String( buffer, StandardCharsets.UTF_8 ); } } private static class SendException extends Exception {} }