From fd3c703e5a31b4194ce32a8ef651bc13a3d03cf1 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 27 Jan 2015 15:48:22 +0100 Subject: Make taskmanager an instance, add callback for finished jobs, improve parent dependency handling --- .../org/openslx/taskmanager/api/AbstractTask.java | 65 ++++++++++++++-------- .../openslx/taskmanager/api/FinishCallback.java | 11 ++++ .../org/openslx/taskmanager/api/TaskStatus.java | 4 +- .../src/main/java/org/openslx/taskmanager/App.java | 22 ++++++-- .../org/openslx/taskmanager/main/Taskmanager.java | 46 ++++++++------- .../taskmanager/network/NetworkHandler.java | 41 ++++++-------- .../openslx/taskmanager/network/RequestParser.java | 20 ++++--- 7 files changed, 128 insertions(+), 81 deletions(-) create mode 100644 api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java index cc837ba..a1c81e8 100644 --- a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java +++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java @@ -21,19 +21,19 @@ public abstract class AbstractTask implements Runnable * The id of the task instance. */ @Expose - private String id = null; + private volatile String id = null; /** * Parent task. This task won't be started as long as the parent task currently * waiting for execution or is being executed. Otherwise this task is available for execution. * Note that MAX_INSTANCES is still being taken into account. Set to null to ignore. */ @Expose - private String parentTask = null; + private volatile String parentTask = null; /** * If the parent task failed to execute, don't run this task and fail immediately. */ @Expose - private boolean failOnParentFail = true; + private volatile boolean failOnParentFail = true; /* * Variables we're working with - these should never be set from incoming (json) data @@ -57,11 +57,16 @@ public abstract class AbstractTask implements Runnable /** * Status of Task */ - private TaskStatus status = TaskStatus.ts_waiting; + private volatile TaskStatus status = TaskStatus.ts_waiting; /** * Reference to parent task */ - private AbstractTask parent = null; + private volatile AbstractTask parent = null; + /** + * Reference to "owner" so we can tell it to check for more + * work when we're done. + */ + private volatile FinishCallback finishCallback = null; /** * Default constructor which should not be overridden @@ -79,12 +84,24 @@ public abstract class AbstractTask implements Runnable * Initialize the task; method used by the {@link Taskmanager}. * Put your own initialization code in initTask() */ - public final boolean init( AbstractTask parent ) + public final boolean init( AbstractTask parent, FinishCallback finishCallback ) { if ( this.initDone ) { LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() ); System.exit( 1 ); } + if ( this.parentTask != null && this.parentTask.isEmpty() ) + this.parentTask = null; + if ( this.parentTask != null && parent == null ) { + if ( this.failOnParentFail ) { + this.status = new TaskStatus( StatusCode.PARENT_FAILED, this.id ); + return false; + } + this.parentTask = null; + } + if ( this.parentTask == null && parent != null ) + parent = null; + this.finishCallback = finishCallback; this.parent = parent; this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id ); this.initDone = true; @@ -116,6 +133,19 @@ public abstract class AbstractTask implements Runnable */ protected abstract boolean execute(); + /** + * This is called when a client requests the status of this task. In case you + * want to return complex structures like Lists, which are not thread safe, you + * might want to keep that list outside the status class you return, and only + * create a copy of it for the status class in this function. + * If you only return more or less atomic data, you don't need to override + * this function + */ + protected void updateStatus() + { + + } + /* * Final methods */ @@ -126,7 +156,7 @@ public abstract class AbstractTask implements Runnable * @return id of parent task, null if no parent set * */ - public String getParentTaskId() + public final String getParentTaskId() { return this.parentTask; } @@ -151,7 +181,7 @@ public abstract class AbstractTask implements Runnable */ public final TaskStatus getStatus() { - if ( this.initDone && this.parentTask != null ) { + if ( this.initDone && this.parent != null ) { final StatusCode parentStatus = parent.getStatusCode(); switch ( parentStatus ) { case DUPLICATE_ID: @@ -160,8 +190,10 @@ public abstract class AbstractTask implements Runnable case NO_SUCH_CONSTRUCTOR: case PARENT_FAILED: case TASK_ERROR: - if ( this.failOnParentFail ) + if ( this.failOnParentFail ) { this.status.statusCode = StatusCode.PARENT_FAILED; + LOG.debug( "Parent " + this.parentTask + " of " + this.id + " failed." ); + } this.parentTask = null; break; default: @@ -267,18 +299,7 @@ public abstract class AbstractTask implements Runnable } else { this.status.statusCode = StatusCode.TASK_ERROR; } - } - - /** - * This is called when a client requests the status of this task. In case you - * want to return complex structures like Lists, which are not thread safe, you - * might want to keep that list outside the status class you return, and only - * create a copy of it for the status class in this function. - * If you only return more or less atomic data, you don't need to override - * this function - */ - protected void updateStatus() - { - + if ( this.finishCallback != null ) + this.finishCallback.taskFinished(); } } diff --git a/api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java b/api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java new file mode 100644 index 0000000..a265595 --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java @@ -0,0 +1,11 @@ +package org.openslx.taskmanager.api; + +public interface FinishCallback +{ + + /** + * Called by a task after execution finished or failed. + */ + public void taskFinished(); + +} diff --git a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java index f01ccc4..0aa9ef7 100644 --- a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java +++ b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java @@ -26,11 +26,11 @@ public final class TaskStatus /** * Overall status of the task. Only set by base methods of the AbstractTask class. */ - protected StatusCode statusCode; + protected volatile StatusCode statusCode; /** * Custom data a task might want to return on status requests. */ - private Object data = null; + private volatile Object data = null; @SuppressWarnings( "unused" ) private final String id; diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java index 86cae4d..c7dfa18 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/App.java +++ b/daemon/src/main/java/org/openslx/taskmanager/App.java @@ -3,12 +3,13 @@ package org.openslx.taskmanager; import java.io.File; import java.io.IOException; import java.net.SocketException; - -import junit.runner.ClassPathTestCollector; +import java.util.ArrayList; +import java.util.List; import org.apache.log4j.BasicConfigurator; import org.openslx.taskmanager.main.Taskmanager; import org.openslx.taskmanager.network.NetworkHandler; +import org.openslx.taskmanager.network.RequestParser; import org.openslx.taskmanager.util.ClassLoaderHack; /** @@ -18,7 +19,7 @@ import org.openslx.taskmanager.util.ClassLoaderHack; public class App { - public static void main( String[] args ) throws SocketException + public static void main( String[] args ) throws SocketException, InterruptedException { BasicConfigurator.configure(); // Load all task plugins @@ -40,9 +41,18 @@ public class App } } Environment.load( "config/environment" ); - NetworkHandler.init(); - Taskmanager.run(); + List threads = new ArrayList<>(); + Taskmanager tm = new Taskmanager(); + RequestParser parser = new RequestParser( tm ); + NetworkHandler nh = new NetworkHandler( Global.LISTEN_PORT, Global.LISTEN_ADDRESS, parser ); + threads.add( new Thread( tm ) ); + threads.add( new Thread( nh ) ); // Wait for everything - NetworkHandler.join(); + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } } } diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java index f3707ed..f9b7752 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -2,10 +2,10 @@ package org.openslx.taskmanager.main; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -14,6 +14,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; import org.openslx.taskmanager.api.AbstractTask; +import org.openslx.taskmanager.api.FinishCallback; import org.openslx.taskmanager.api.TaskStatus; import org.openslx.taskmanager.util.ClassLoaderHack; import org.openslx.taskmanager.util.Util; @@ -21,32 +22,32 @@ import org.openslx.taskmanager.util.Util; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; -public class Taskmanager +public class Taskmanager implements FinishCallback, Runnable { private static final Logger log = Logger.getLogger( Taskmanager.class ); - private static final ExecutorService threadPool = Executors.newCachedThreadPool(); + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue( 16 ) ); /** - * Static gson object for (de)serialization + * gson object for (de)serialization */ - private static final Gson gson = Util.explicitGsonInstance(); + private final Gson gson = Util.explicitGsonInstance(); /** * Cache of known tasks */ - private static final Map> tasks = new ConcurrentHashMap<>(); + private final Map> tasks = new ConcurrentHashMap<>(); /** * All the running/finished task instances. The mainloop will call wait() on this and this object * is notified as soon as the mainloop should check if there is any task available that can be * run. */ - private static final Map instances = new ConcurrentHashMap<>(); + private final Map instances = new ConcurrentHashMap<>(); - private static final Lock workLock = new ReentrantLock(); - private static final Condition doCheckForWork = workLock.newCondition(); + private final Lock workLock = new ReentrantLock(); + private final Condition doCheckForWork = workLock.newCondition(); /* * Static methods @@ -61,7 +62,7 @@ public class Taskmanager * @param taskId - ID of the task to retrieve the status of * @return TaskStatus */ - public static TaskStatus getTaskStatus( final String taskId ) + public TaskStatus getTaskStatus( final String taskId ) { AbstractTask task = instances.get( taskId ); if ( task == null ) @@ -81,7 +82,7 @@ public class Taskmanager * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if * there is no task registered under the given name. */ - public static TaskStatus submitTask( final String task, final String jsonData ) + public TaskStatus submitTask( final String task, final String jsonData ) { // Get task class Class taskClass; @@ -108,7 +109,7 @@ public class Taskmanager log.warn( task + " exists, but could not be instanciated!" ); return TaskStatus.ts_noSuchConstructor; } - if ( taskInstance.getId() == null ) { + if ( taskInstance.getId() == null || taskInstance.getId().isEmpty() ) { log.warn( "Tried to launch " + task + " with null-id" ); return TaskStatus.ts_noSuchConstructor; } @@ -123,13 +124,12 @@ public class Taskmanager AbstractTask parent = null; if ( taskInstance.getParentTaskId() != null ) parent = instances.get( taskInstance.getParentTaskId() ); - if ( taskInstance.init( parent ) ) { - checkForWork(); - } + taskInstance.init( parent, this ); + checkForWork(); return taskInstance.getStatus(); } - public static void releaseTask( String taskId ) + public void releaseTask( String taskId ) { final AbstractTask task = instances.get( taskId ); if ( task != null ) @@ -140,7 +140,7 @@ public class Taskmanager * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances * is waiting for execution. */ - protected static void checkForWork() + protected void checkForWork() { workLock.lock(); try { @@ -150,7 +150,14 @@ public class Taskmanager } } - public static void run() + @Override + public void taskFinished() + { + checkForWork(); + } + + @Override + public void run() { try { while ( !Global.doShutdown ) { @@ -194,6 +201,7 @@ public class Taskmanager } catch ( InterruptedException e ) { log.info( "Interrupted!" ); } + System.exit( 0 ); } } diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java index 3e2c8fd..5d98512 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java @@ -3,6 +3,7 @@ package org.openslx.taskmanager.network; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; +import java.net.InetAddress; import java.net.SocketAddress; import java.net.SocketException; import java.nio.charset.StandardCharsets; @@ -23,46 +24,33 @@ public class NetworkHandler implements Runnable // Static part - private static Thread recvThread = null; - private static Thread sendThread = null; + private Thread sendThread = null; /** * Sender instance (Runnable handling outgoing packets) */ - private static Sender sender = null; + private final Sender sender; /** * UDP socket for sending and receiving. */ - private static DatagramSocket socket; + private final DatagramSocket socket; + + private final RequestParser parser; /** * Initialize the NetworkHandler by starting threads and opening the socket. */ - public static void init() throws SocketException + public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException { - if ( recvThread != null ) - throw new RuntimeException( "Already initialized" ); - socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS ); - recvThread = new Thread( new NetworkHandler() ); - recvThread.start(); + socket = new DatagramSocket( port, listenAddress ); sendThread = new Thread( sender = new Sender() ); - sendThread.start(); + this.parser = parser; } - public static void shutdown() + public void shutdown() { socket.close(); } - public static void join() - { - try { - recvThread.join(); - sendThread.join(); - } catch ( InterruptedException e ) { - Thread.currentThread().interrupt(); - } - } - // Class part /** @@ -97,6 +85,7 @@ public class NetworkHandler implements Runnable { byte readBuffer[] = new byte[ 66000 ]; try { + sendThread.start(); while ( !Global.doShutdown ) { DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); try { @@ -112,11 +101,12 @@ public class NetworkHandler implements Runnable } String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); try { - byte[] reply = RequestParser.handle( payload ); + byte[] reply = parser.handle( payload ); if ( reply != null ) send( packet.getSocketAddress(), reply ); } catch ( Throwable t ) { - log.error( "Exception in RequestParser: " + t.getMessage() ); + log.error( "Exception in RequestParser: " + t.toString() ); + log.error( "Payload was: " + payload ); t.printStackTrace(); } } @@ -124,6 +114,7 @@ public class NetworkHandler implements Runnable Thread.currentThread().interrupt(); } finally { Global.doShutdown = true; + sendThread.interrupt(); log.info( "UDP receiver finished." ); } } @@ -132,7 +123,7 @@ public class NetworkHandler implements Runnable * Private sending thread. * Use blocking queue, wait for packet to be added to it, then try to send. */ - static class Sender implements Runnable + private class Sender implements Runnable { /** diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java index d2cfb21..5741e58 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java +++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java @@ -15,15 +15,21 @@ public class RequestParser /** * Our very own gson instance (for serializing replies) */ - private static final Gson sendGson = new Gson(); + private final Gson sendGson = new Gson(); + + private final Taskmanager taskManager; + + public RequestParser( Taskmanager tm ) + { + this.taskManager = tm; + } /** * Handle the given unparsed request. * - * @param source source of the request, where the reply will be send to (if any) * @param payload Packet data received from network, already converted to a string */ - public static byte[] handle( String payload ) + public byte[] handle( String payload ) { String[] parts = payload.split( " *, *", 3 ); // Message format is ", , " @@ -34,21 +40,21 @@ public class RequestParser // Look at parts[1], if it's "status" it's a request for the task // with the ID given in parts[2] if ( parts[1].equals( "status" ) ) { - TaskStatus status = Taskmanager.getTaskStatus( parts[2] ); + TaskStatus status = taskManager.getTaskStatus( parts[2] ); return serialize( parts[0], status ); } // Now check if parts[1] is "release" if ( parts[1].equals( "release" ) ) { - Taskmanager.releaseTask( parts[2] ); + taskManager.releaseTask( parts[2] ); return null; } // Anything else in parts[0] will be treated as a fresh task invocation, so let's // pass it on to the task manager. - TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] ); + TaskStatus status = taskManager.submitTask( parts[1], parts[2] ); return serialize( parts[0], status ); } - private static byte[] serialize( String messageId, TaskStatus status ) + private byte[] serialize( String messageId, TaskStatus status ) { String data; try { -- cgit v1.2.3-55-g7522