summaryrefslogtreecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java')
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java193
1 files changed, 193 insertions, 0 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
new file mode 100644
index 0000000..e37b3d7
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
@@ -0,0 +1,193 @@
+package org.openslx.taskmanager.network;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.Global;
+
+/**
+ * The network listener that will receive incoming UDP packets, try to process
+ * them, and then send a reply.
+ */
+public class NetworkHandlerTcp extends NetworkHandlerBase
+{
+
+ private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class );
+
+ /**
+ * UDP socket for sending and receiving.
+ */
+ private final ServerSocket socket;
+
+ private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+
+ /**
+ * Initialize the NetworkHandler by starting threads and opening the socket.
+ */
+ public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException
+ {
+ super( parser );
+ socket = new ServerSocket( port, 10, listenAddress );
+ log.info( "Listening on TCP:" + port );
+ threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
+ }
+
+ public void shutdown()
+ {
+ try {
+ socket.close();
+ } catch ( IOException e ) {
+ }
+ }
+
+ /**
+ * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
+ */
+ @Override
+ public void run()
+ {
+ try {
+ while ( !Global.doShutdown ) {
+ Socket client;
+ try {
+ client = socket.accept();
+ } catch ( IOException e1 ) {
+ log.warn( "ACCEPT fail", e1 );
+ break;
+ }
+ try {
+ threadPool.execute( new ClientTask( client ) );
+ } catch ( RejectedExecutionException e ) {
+ try {
+ client.close();
+ } catch ( IOException e1 ) {
+ }
+ }
+ }
+ } finally {
+ Thread.currentThread().interrupt();
+ Global.doShutdown = true;
+ log.info( "UDP receiver finished." );
+ }
+ }
+
+ private class ClientTask implements Runnable
+ {
+ private final Socket socket;
+ private DataOutputStream writer = null;
+ private DataInputStream reader = null;
+
+ public ClientTask( Socket client )
+ {
+ this.socket = client;
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ try {
+ socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) );
+ reader = new DataInputStream( socket.getInputStream() );
+ writer = new DataOutputStream( socket.getOutputStream() );
+ } catch ( IOException e ) {
+ log.info( "IOException on TCP socket when setting up streams", e );
+ return;
+ }
+ String payload = readMsg( reader );
+ if ( payload == null )
+ return;
+ int i = payload.indexOf( ' ' );
+ if ( i != -1 ) {
+ // For future extensibility we throw away everything after the first space (including the space)
+ payload = payload.substring( 0, i );
+ }
+ if ( !payload.equals( Global.PASSWORD ) ) {
+ sendMsg( "ERROR,Wrong password" );
+ return;
+ }
+ while ( !socket.isClosed() ) {
+ payload = readMsg( reader );
+ if ( payload == null )
+ return;
+ byte[] reply;
+ try {
+ reply = parser.handle( payload );
+ } catch ( Throwable t ) {
+ log.error( "Exception in RequestParser", t );
+ log.error( "Payload was: " + payload );
+ continue;
+ }
+ if ( reply != null ) {
+ sendMsg( reply );
+ }
+ }
+ } catch ( SendException e ) {
+ log.warn( "Cannot send reply to client", e );
+ } finally {
+ try {
+ if ( writer != null ) {
+ writer.flush();
+ writer.close();
+ }
+ socket.close();
+ } catch ( IOException e ) {
+ }
+ }
+ }
+
+ private void sendMsg( String reply ) throws SendException
+ {
+ sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) );
+ }
+
+ private void sendMsg( byte[] reply ) throws SendException
+ {
+ try {
+ writer.writeInt( reply.length );
+ writer.write( reply );
+ } catch ( IOException e ) {
+ throw new SendException();
+ }
+ }
+
+ private String readMsg( DataInputStream reader )
+ {
+ int bytes;
+ try {
+ bytes = reader.readInt();
+ } catch ( IOException e ) {
+ // This should be fine... Client went away
+ return null;
+ }
+ if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) {
+ log.info( "Invalid request size: " + bytes );
+ return null;
+ }
+ if ( bytes == 0 )
+ return ""; // Nothing to read
+ byte[] buffer = new byte[ bytes ];
+ try {
+ reader.readFully( buffer );
+ } catch ( IOException e ) {
+ log.warn( "Client went away when trying to read payload" );
+ return null;
+ }
+ return new String( buffer, StandardCharsets.UTF_8 );
+ }
+
+ }
+
+ private static class SendException extends Exception {}
+
+}