diff options
Diffstat (limited to 'daemon/src/main/java/org/openslx/taskmanager/network')
-rw-r--r-- | daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java | 12 | ||||
-rw-r--r-- | daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java | 193 | ||||
-rw-r--r-- | daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java (renamed from daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java) | 21 |
3 files changed, 215 insertions, 11 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java new file mode 100644 index 0000000..03dc32f --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerBase.java @@ -0,0 +1,12 @@ +package org.openslx.taskmanager.network; + +public abstract class NetworkHandlerBase implements Runnable +{ + protected final RequestParser parser; + + public NetworkHandlerBase( RequestParser parser ) + { + this.parser = parser; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java new file mode 100644 index 0000000..e37b3d7 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java @@ -0,0 +1,193 @@ +package org.openslx.taskmanager.network; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandlerTcp extends NetworkHandlerBase +{ + + private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class ); + + /** + * UDP socket for sending and receiving. + */ + private final ServerSocket socket; + + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException + { + super( parser ); + socket = new ServerSocket( port, 10, listenAddress ); + log.info( "Listening on TCP:" + port ); + threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() ); + } + + public void shutdown() + { + try { + socket.close(); + } catch ( IOException e ) { + } + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + Socket client; + try { + client = socket.accept(); + } catch ( IOException e1 ) { + log.warn( "ACCEPT fail", e1 ); + break; + } + try { + threadPool.execute( new ClientTask( client ) ); + } catch ( RejectedExecutionException e ) { + try { + client.close(); + } catch ( IOException e1 ) { + } + } + } + } finally { + Thread.currentThread().interrupt(); + Global.doShutdown = true; + log.info( "UDP receiver finished." ); + } + } + + private class ClientTask implements Runnable + { + private final Socket socket; + private DataOutputStream writer = null; + private DataInputStream reader = null; + + public ClientTask( Socket client ) + { + this.socket = client; + } + + @Override + public void run() + { + try { + try { + socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) ); + reader = new DataInputStream( socket.getInputStream() ); + writer = new DataOutputStream( socket.getOutputStream() ); + } catch ( IOException e ) { + log.info( "IOException on TCP socket when setting up streams", e ); + return; + } + String payload = readMsg( reader ); + if ( payload == null ) + return; + int i = payload.indexOf( ' ' ); + if ( i != -1 ) { + // For future extensibility we throw away everything after the first space (including the space) + payload = payload.substring( 0, i ); + } + if ( !payload.equals( Global.PASSWORD ) ) { + sendMsg( "ERROR,Wrong password" ); + return; + } + while ( !socket.isClosed() ) { + payload = readMsg( reader ); + if ( payload == null ) + return; + byte[] reply; + try { + reply = parser.handle( payload ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser", t ); + log.error( "Payload was: " + payload ); + continue; + } + if ( reply != null ) { + sendMsg( reply ); + } + } + } catch ( SendException e ) { + log.warn( "Cannot send reply to client", e ); + } finally { + try { + if ( writer != null ) { + writer.flush(); + writer.close(); + } + socket.close(); + } catch ( IOException e ) { + } + } + } + + private void sendMsg( String reply ) throws SendException + { + sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) ); + } + + private void sendMsg( byte[] reply ) throws SendException + { + try { + writer.writeInt( reply.length ); + writer.write( reply ); + } catch ( IOException e ) { + throw new SendException(); + } + } + + private String readMsg( DataInputStream reader ) + { + int bytes; + try { + bytes = reader.readInt(); + } catch ( IOException e ) { + // This should be fine... Client went away + return null; + } + if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) { + log.info( "Invalid request size: " + bytes ); + return null; + } + if ( bytes == 0 ) + return ""; // Nothing to read + byte[] buffer = new byte[ bytes ]; + try { + reader.readFully( buffer ); + } catch ( IOException e ) { + log.warn( "Client went away when trying to read payload" ); + return null; + } + return new String( buffer, StandardCharsets.UTF_8 ); + } + + } + + private static class SendException extends Exception {} + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java index 6946cd1..4c12960 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerUdp.java @@ -17,12 +17,10 @@ import org.openslx.taskmanager.Global; * The network listener that will receive incoming UDP packets, try to process * them, and then send a reply. */ -public class NetworkHandler implements Runnable +public class NetworkHandlerUdp extends NetworkHandlerBase { - private static final Logger log = Logger.getLogger( NetworkHandler.class ); - - // Static part + private static final Logger log = Logger.getLogger( NetworkHandlerUdp.class ); private Thread sendThread = null; /** @@ -34,16 +32,15 @@ public class NetworkHandler implements Runnable */ private final DatagramSocket socket; - private final RequestParser parser; - /** * Initialize the NetworkHandler by starting threads and opening the socket. */ - public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException + public NetworkHandlerUdp( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException { + super(parser); socket = new DatagramSocket( port, listenAddress ); + log.info( "Listening on UDP:" + port ); sendThread = new Thread( sender = new Sender() ); - this.parser = parser; } public void shutdown() @@ -100,15 +97,17 @@ public class NetworkHandler implements Runnable continue; } String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); + byte[] reply; try { - byte[] reply = parser.handle( payload ); - if ( reply != null ) - send( packet.getSocketAddress(), reply ); + reply = parser.handle( payload ); } catch ( Throwable t ) { log.error( "Exception in RequestParser: " + t.toString() ); log.error( "Payload was: " + payload ); t.printStackTrace(); + continue; } + if ( reply != null ) + send( packet.getSocketAddress(), reply ); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); |