From 67a4f15659514288f287816b233ea92ae153c099 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 19 Apr 2021 13:03:10 +0200 Subject: [daemon] Assign thread names, fine tune socket timeouts --- .../src/main/java/org/openslx/taskmanager/App.java | 6 ++--- .../org/openslx/taskmanager/main/Taskmanager.java | 12 ++++++++- .../taskmanager/network/NetworkHandlerTcp.java | 30 ++++++++++++++++------ 3 files changed, 36 insertions(+), 12 deletions(-) (limited to 'daemon') diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java index 3b8eb66..6580550 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/App.java +++ b/daemon/src/main/java/org/openslx/taskmanager/App.java @@ -37,13 +37,13 @@ public class App NetworkHandlerTcp tcp = null; if (Global.PORT_UDP != -1) { udp = new NetworkHandlerUdp( Global.PORT_UDP, Global.LISTEN_ADDRESS, parser ); - threads.add( new Thread( udp ) ); + threads.add( new Thread( udp, "UDP:" + Global.PORT_UDP ) ); } if (Global.PORT_TCP != -1) { tcp = new NetworkHandlerTcp( Global.PORT_TCP, Global.LISTEN_ADDRESS, parser ); - threads.add( new Thread( tcp ) ); + threads.add( new Thread( tcp, "TCP:" + Global.PORT_TCP ) ); } - threads.add( new Thread( tm ) ); + threads.add( new Thread( tm, "Taskmanager" ) ); // Wait for everything for (Thread t : threads) { t.start(); diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java index 7b854c3..74fc1b7 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -6,8 +6,10 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; @@ -27,7 +29,15 @@ public class Taskmanager implements FinishCallback, Runnable private static final Logger log = Logger.getLogger( Taskmanager.class ); private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 8, 32, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue( 4 ) ); + new ArrayBlockingQueue( 4 ), new ThreadFactory() { + AtomicInteger id = new AtomicInteger(); + + @Override + public Thread newThread( Runnable r ) + { + return new Thread( r, "Task-" + id.incrementAndGet() ); + } + } ); /** * gson object for (de)serialization diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java index 540ff42..8b8b6d5 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java @@ -9,8 +9,10 @@ import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; @@ -29,7 +31,15 @@ public class NetworkHandlerTcp extends NetworkHandlerBase */ private final ServerSocket socket; - private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue() ); + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 0, 64, 1, TimeUnit.MINUTES, new SynchronousQueue(), new ThreadFactory() { + AtomicInteger id = new AtomicInteger(); + + @Override + public Thread newThread( Runnable r ) + { + return new Thread( r, "TCP-" + id.incrementAndGet() ); + } + } ); /** * Initialize the NetworkHandler by starting threads and opening the socket. @@ -77,7 +87,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase } finally { Thread.currentThread().interrupt(); Global.doShutdown = true; - log.info( "UDP receiver finished." ); + log.info( "TCP receiver finished." ); } } @@ -86,7 +96,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase private final Socket socket; private DataOutputStream writer = null; private DataInputStream reader = null; - + public ClientTask( Socket client ) { this.socket = client; @@ -97,7 +107,6 @@ public class NetworkHandlerTcp extends NetworkHandlerBase { try { try { - socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) ); reader = new DataInputStream( socket.getInputStream() ); writer = new DataOutputStream( socket.getOutputStream() ); } catch ( IOException e ) { @@ -145,7 +154,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase } } } - + private void sendMsg( String reply ) throws SendException { sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) ); @@ -154,6 +163,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase private void sendMsg( byte[] reply ) throws SendException { try { + socket.setSoTimeout( (int)TimeUnit.SECONDS.toMillis( 15 ) ); writer.writeInt( reply.length ); writer.write( reply ); } catch ( IOException e ) { @@ -165,6 +175,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase { int bytes; try { + socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) ); bytes = reader.readInt(); } catch ( IOException e ) { // This should be fine... Client went away @@ -178,16 +189,19 @@ public class NetworkHandlerTcp extends NetworkHandlerBase return ""; // Nothing to read byte[] buffer = new byte[ bytes ]; try { + socket.setSoTimeout( (int)TimeUnit.SECONDS.toMillis( 2 ) ); reader.readFully( buffer ); } catch ( IOException e ) { log.warn( "Client went away when trying to read payload" ); return null; } return new String( buffer, StandardCharsets.UTF_8 ); - } + } } - - private static class SendException extends Exception {} + + private static class SendException extends Exception + { + } } -- cgit v1.2.3-55-g7522