summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2021-04-19 13:03:10 +0200
committerSimon Rettberg2021-04-19 13:03:10 +0200
commit67a4f15659514288f287816b233ea92ae153c099 (patch)
tree661934c9ef194e7a3872d2d89777808130ec3928
parent[api] Make env unmodifiable at assignment time (diff)
downloadtaskman-lite-67a4f15659514288f287816b233ea92ae153c099.tar.gz
taskman-lite-67a4f15659514288f287816b233ea92ae153c099.tar.xz
taskman-lite-67a4f15659514288f287816b233ea92ae153c099.zip
[daemon] Assign thread names, fine tune socket timeoutsv3.10
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/App.java6
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java12
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java30
3 files changed, 36 insertions, 12 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java
index 3b8eb66..6580550 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/App.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/App.java
@@ -37,13 +37,13 @@ public class App
NetworkHandlerTcp tcp = null;
if (Global.PORT_UDP != -1) {
udp = new NetworkHandlerUdp( Global.PORT_UDP, Global.LISTEN_ADDRESS, parser );
- threads.add( new Thread( udp ) );
+ threads.add( new Thread( udp, "UDP:" + Global.PORT_UDP ) );
}
if (Global.PORT_TCP != -1) {
tcp = new NetworkHandlerTcp( Global.PORT_TCP, Global.LISTEN_ADDRESS, parser );
- threads.add( new Thread( tcp ) );
+ threads.add( new Thread( tcp, "TCP:" + Global.PORT_TCP ) );
}
- threads.add( new Thread( tm ) );
+ threads.add( new Thread( tm, "Taskmanager" ) );
// Wait for everything
for (Thread t : threads) {
t.start();
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
index 7b854c3..74fc1b7 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
@@ -6,8 +6,10 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
@@ -27,7 +29,15 @@ public class Taskmanager implements FinishCallback, Runnable
private static final Logger log = Logger.getLogger( Taskmanager.class );
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 8, 32, 1, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>( 4 ) );
+ new ArrayBlockingQueue<Runnable>( 4 ), new ThreadFactory() {
+ AtomicInteger id = new AtomicInteger();
+
+ @Override
+ public Thread newThread( Runnable r )
+ {
+ return new Thread( r, "Task-" + id.incrementAndGet() );
+ }
+ } );
/**
* gson object for (de)serialization
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
index 540ff42..8b8b6d5 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandlerTcp.java
@@ -9,8 +9,10 @@ import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
@@ -29,7 +31,15 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
*/
private final ServerSocket socket;
- private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+ private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 0, 64, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ AtomicInteger id = new AtomicInteger();
+
+ @Override
+ public Thread newThread( Runnable r )
+ {
+ return new Thread( r, "TCP-" + id.incrementAndGet() );
+ }
+ } );
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
@@ -77,7 +87,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
} finally {
Thread.currentThread().interrupt();
Global.doShutdown = true;
- log.info( "UDP receiver finished." );
+ log.info( "TCP receiver finished." );
}
}
@@ -86,7 +96,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
private final Socket socket;
private DataOutputStream writer = null;
private DataInputStream reader = null;
-
+
public ClientTask( Socket client )
{
this.socket = client;
@@ -97,7 +107,6 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
{
try {
try {
- socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) );
reader = new DataInputStream( socket.getInputStream() );
writer = new DataOutputStream( socket.getOutputStream() );
} catch ( IOException e ) {
@@ -145,7 +154,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
}
}
}
-
+
private void sendMsg( String reply ) throws SendException
{
sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) );
@@ -154,6 +163,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
private void sendMsg( byte[] reply ) throws SendException
{
try {
+ socket.setSoTimeout( (int)TimeUnit.SECONDS.toMillis( 15 ) );
writer.writeInt( reply.length );
writer.write( reply );
} catch ( IOException e ) {
@@ -165,6 +175,7 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
{
int bytes;
try {
+ socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) );
bytes = reader.readInt();
} catch ( IOException e ) {
// This should be fine... Client went away
@@ -178,16 +189,19 @@ public class NetworkHandlerTcp extends NetworkHandlerBase
return ""; // Nothing to read
byte[] buffer = new byte[ bytes ];
try {
+ socket.setSoTimeout( (int)TimeUnit.SECONDS.toMillis( 2 ) );
reader.readFully( buffer );
} catch ( IOException e ) {
log.warn( "Client went away when trying to read payload" );
return null;
}
return new String( buffer, StandardCharsets.UTF_8 );
- }
+ }
}
-
- private static class SendException extends Exception {}
+
+ private static class SendException extends Exception
+ {
+ }
}