summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2015-01-27 15:48:22 +0100
committerSimon Rettberg2015-01-27 15:48:22 +0100
commitfd3c703e5a31b4194ce32a8ef651bc13a3d03cf1 (patch)
tree282c0b81edd6c63bb4553d506ac6ec06d190da28
parentRework plugin loading a little (diff)
downloadtaskman-lite-fd3c703e5a31b4194ce32a8ef651bc13a3d03cf1.tar.gz
taskman-lite-fd3c703e5a31b4194ce32a8ef651bc13a3d03cf1.tar.xz
taskman-lite-fd3c703e5a31b4194ce32a8ef651bc13a3d03cf1.zip
Make taskmanager an instance, add callback for finished jobs, improve parent dependency handling
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java65
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java11
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java4
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/App.java22
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java46
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java41
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java20
7 files changed, 128 insertions, 81 deletions
diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
index cc837ba..a1c81e8 100644
--- a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
+++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
@@ -21,19 +21,19 @@ public abstract class AbstractTask implements Runnable
* The id of the task instance.
*/
@Expose
- private String id = null;
+ private volatile String id = null;
/**
* Parent task. This task won't be started as long as the parent task currently
* waiting for execution or is being executed. Otherwise this task is available for execution.
* Note that MAX_INSTANCES is still being taken into account. Set to null to ignore.
*/
@Expose
- private String parentTask = null;
+ private volatile String parentTask = null;
/**
* If the parent task failed to execute, don't run this task and fail immediately.
*/
@Expose
- private boolean failOnParentFail = true;
+ private volatile boolean failOnParentFail = true;
/*
* Variables we're working with - these should never be set from incoming (json) data
@@ -57,11 +57,16 @@ public abstract class AbstractTask implements Runnable
/**
* Status of Task
*/
- private TaskStatus status = TaskStatus.ts_waiting;
+ private volatile TaskStatus status = TaskStatus.ts_waiting;
/**
* Reference to parent task
*/
- private AbstractTask parent = null;
+ private volatile AbstractTask parent = null;
+ /**
+ * Reference to "owner" so we can tell it to check for more
+ * work when we're done.
+ */
+ private volatile FinishCallback finishCallback = null;
/**
* Default constructor which should not be overridden
@@ -79,12 +84,24 @@ public abstract class AbstractTask implements Runnable
* Initialize the task; method used by the {@link Taskmanager}.
* Put your own initialization code in initTask()
*/
- public final boolean init( AbstractTask parent )
+ public final boolean init( AbstractTask parent, FinishCallback finishCallback )
{
if ( this.initDone ) {
LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() );
System.exit( 1 );
}
+ if ( this.parentTask != null && this.parentTask.isEmpty() )
+ this.parentTask = null;
+ if ( this.parentTask != null && parent == null ) {
+ if ( this.failOnParentFail ) {
+ this.status = new TaskStatus( StatusCode.PARENT_FAILED, this.id );
+ return false;
+ }
+ this.parentTask = null;
+ }
+ if ( this.parentTask == null && parent != null )
+ parent = null;
+ this.finishCallback = finishCallback;
this.parent = parent;
this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id );
this.initDone = true;
@@ -116,6 +133,19 @@ public abstract class AbstractTask implements Runnable
*/
protected abstract boolean execute();
+ /**
+ * This is called when a client requests the status of this task. In case you
+ * want to return complex structures like Lists, which are not thread safe, you
+ * might want to keep that list outside the status class you return, and only
+ * create a copy of it for the status class in this function.
+ * If you only return more or less atomic data, you don't need to override
+ * this function
+ */
+ protected void updateStatus()
+ {
+
+ }
+
/*
* Final methods
*/
@@ -126,7 +156,7 @@ public abstract class AbstractTask implements Runnable
* @return id of parent task, null if no parent set
*
*/
- public String getParentTaskId()
+ public final String getParentTaskId()
{
return this.parentTask;
}
@@ -151,7 +181,7 @@ public abstract class AbstractTask implements Runnable
*/
public final TaskStatus getStatus()
{
- if ( this.initDone && this.parentTask != null ) {
+ if ( this.initDone && this.parent != null ) {
final StatusCode parentStatus = parent.getStatusCode();
switch ( parentStatus ) {
case DUPLICATE_ID:
@@ -160,8 +190,10 @@ public abstract class AbstractTask implements Runnable
case NO_SUCH_CONSTRUCTOR:
case PARENT_FAILED:
case TASK_ERROR:
- if ( this.failOnParentFail )
+ if ( this.failOnParentFail ) {
this.status.statusCode = StatusCode.PARENT_FAILED;
+ LOG.debug( "Parent " + this.parentTask + " of " + this.id + " failed." );
+ }
this.parentTask = null;
break;
default:
@@ -267,18 +299,7 @@ public abstract class AbstractTask implements Runnable
} else {
this.status.statusCode = StatusCode.TASK_ERROR;
}
- }
-
- /**
- * This is called when a client requests the status of this task. In case you
- * want to return complex structures like Lists, which are not thread safe, you
- * might want to keep that list outside the status class you return, and only
- * create a copy of it for the status class in this function.
- * If you only return more or less atomic data, you don't need to override
- * this function
- */
- protected void updateStatus()
- {
-
+ if ( this.finishCallback != null )
+ this.finishCallback.taskFinished();
}
}
diff --git a/api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java b/api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java
new file mode 100644
index 0000000..a265595
--- /dev/null
+++ b/api/src/main/java/org/openslx/taskmanager/api/FinishCallback.java
@@ -0,0 +1,11 @@
+package org.openslx.taskmanager.api;
+
+public interface FinishCallback
+{
+
+ /**
+ * Called by a task after execution finished or failed.
+ */
+ public void taskFinished();
+
+}
diff --git a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
index f01ccc4..0aa9ef7 100644
--- a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
+++ b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
@@ -26,11 +26,11 @@ public final class TaskStatus
/**
* Overall status of the task. Only set by base methods of the AbstractTask class.
*/
- protected StatusCode statusCode;
+ protected volatile StatusCode statusCode;
/**
* Custom data a task might want to return on status requests.
*/
- private Object data = null;
+ private volatile Object data = null;
@SuppressWarnings( "unused" )
private final String id;
diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java
index 86cae4d..c7dfa18 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/App.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/App.java
@@ -3,12 +3,13 @@ package org.openslx.taskmanager;
import java.io.File;
import java.io.IOException;
import java.net.SocketException;
-
-import junit.runner.ClassPathTestCollector;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.log4j.BasicConfigurator;
import org.openslx.taskmanager.main.Taskmanager;
import org.openslx.taskmanager.network.NetworkHandler;
+import org.openslx.taskmanager.network.RequestParser;
import org.openslx.taskmanager.util.ClassLoaderHack;
/**
@@ -18,7 +19,7 @@ import org.openslx.taskmanager.util.ClassLoaderHack;
public class App
{
- public static void main( String[] args ) throws SocketException
+ public static void main( String[] args ) throws SocketException, InterruptedException
{
BasicConfigurator.configure();
// Load all task plugins
@@ -40,9 +41,18 @@ public class App
}
}
Environment.load( "config/environment" );
- NetworkHandler.init();
- Taskmanager.run();
+ List<Thread> threads = new ArrayList<>();
+ Taskmanager tm = new Taskmanager();
+ RequestParser parser = new RequestParser( tm );
+ NetworkHandler nh = new NetworkHandler( Global.LISTEN_PORT, Global.LISTEN_ADDRESS, parser );
+ threads.add( new Thread( tm ) );
+ threads.add( new Thread( nh ) );
// Wait for everything
- NetworkHandler.join();
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
}
}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
index f3707ed..f9b7752 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
@@ -2,10 +2,10 @@ package org.openslx.taskmanager.main;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -14,6 +14,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
+import org.openslx.taskmanager.api.FinishCallback;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
import org.openslx.taskmanager.util.Util;
@@ -21,32 +22,32 @@ import org.openslx.taskmanager.util.Util;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
-public class Taskmanager
+public class Taskmanager implements FinishCallback, Runnable
{
private static final Logger log = Logger.getLogger( Taskmanager.class );
- private static final ExecutorService threadPool = Executors.newCachedThreadPool();
+ private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 16 ) );
/**
- * Static gson object for (de)serialization
+ * gson object for (de)serialization
*/
- private static final Gson gson = Util.explicitGsonInstance();
+ private final Gson gson = Util.explicitGsonInstance();
/**
* Cache of known tasks
*/
- private static final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
+ private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
/**
* All the running/finished task instances. The mainloop will call wait() on this and this object
* is notified as soon as the mainloop should check if there is any task available that can be
* run.
*/
- private static final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
+ private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
- private static final Lock workLock = new ReentrantLock();
- private static final Condition doCheckForWork = workLock.newCondition();
+ private final Lock workLock = new ReentrantLock();
+ private final Condition doCheckForWork = workLock.newCondition();
/*
* Static methods
@@ -61,7 +62,7 @@ public class Taskmanager
* @param taskId - ID of the task to retrieve the status of
* @return TaskStatus
*/
- public static TaskStatus getTaskStatus( final String taskId )
+ public TaskStatus getTaskStatus( final String taskId )
{
AbstractTask task = instances.get( taskId );
if ( task == null )
@@ -81,7 +82,7 @@ public class Taskmanager
* @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
* there is no task registered under the given name.
*/
- public static TaskStatus submitTask( final String task, final String jsonData )
+ public TaskStatus submitTask( final String task, final String jsonData )
{
// Get task class
Class<? extends AbstractTask> taskClass;
@@ -108,7 +109,7 @@ public class Taskmanager
log.warn( task + " exists, but could not be instanciated!" );
return TaskStatus.ts_noSuchConstructor;
}
- if ( taskInstance.getId() == null ) {
+ if ( taskInstance.getId() == null || taskInstance.getId().isEmpty() ) {
log.warn( "Tried to launch " + task + " with null-id" );
return TaskStatus.ts_noSuchConstructor;
}
@@ -123,13 +124,12 @@ public class Taskmanager
AbstractTask parent = null;
if ( taskInstance.getParentTaskId() != null )
parent = instances.get( taskInstance.getParentTaskId() );
- if ( taskInstance.init( parent ) ) {
- checkForWork();
- }
+ taskInstance.init( parent, this );
+ checkForWork();
return taskInstance.getStatus();
}
- public static void releaseTask( String taskId )
+ public void releaseTask( String taskId )
{
final AbstractTask task = instances.get( taskId );
if ( task != null )
@@ -140,7 +140,7 @@ public class Taskmanager
* Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
* is waiting for execution.
*/
- protected static void checkForWork()
+ protected void checkForWork()
{
workLock.lock();
try {
@@ -150,7 +150,14 @@ public class Taskmanager
}
}
- public static void run()
+ @Override
+ public void taskFinished()
+ {
+ checkForWork();
+ }
+
+ @Override
+ public void run()
{
try {
while ( !Global.doShutdown ) {
@@ -194,6 +201,7 @@ public class Taskmanager
} catch ( InterruptedException e ) {
log.info( "Interrupted!" );
}
+ System.exit( 0 );
}
}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
index 3e2c8fd..5d98512 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
@@ -3,6 +3,7 @@ package org.openslx.taskmanager.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
+import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
@@ -23,46 +24,33 @@ public class NetworkHandler implements Runnable
// Static part
- private static Thread recvThread = null;
- private static Thread sendThread = null;
+ private Thread sendThread = null;
/**
* Sender instance (Runnable handling outgoing packets)
*/
- private static Sender sender = null;
+ private final Sender sender;
/**
* UDP socket for sending and receiving.
*/
- private static DatagramSocket socket;
+ private final DatagramSocket socket;
+
+ private final RequestParser parser;
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
*/
- public static void init() throws SocketException
+ public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
{
- if ( recvThread != null )
- throw new RuntimeException( "Already initialized" );
- socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS );
- recvThread = new Thread( new NetworkHandler() );
- recvThread.start();
+ socket = new DatagramSocket( port, listenAddress );
sendThread = new Thread( sender = new Sender() );
- sendThread.start();
+ this.parser = parser;
}
- public static void shutdown()
+ public void shutdown()
{
socket.close();
}
- public static void join()
- {
- try {
- recvThread.join();
- sendThread.join();
- } catch ( InterruptedException e ) {
- Thread.currentThread().interrupt();
- }
- }
-
// Class part
/**
@@ -97,6 +85,7 @@ public class NetworkHandler implements Runnable
{
byte readBuffer[] = new byte[ 66000 ];
try {
+ sendThread.start();
while ( !Global.doShutdown ) {
DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
try {
@@ -112,11 +101,12 @@ public class NetworkHandler implements Runnable
}
String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
try {
- byte[] reply = RequestParser.handle( payload );
+ byte[] reply = parser.handle( payload );
if ( reply != null )
send( packet.getSocketAddress(), reply );
} catch ( Throwable t ) {
- log.error( "Exception in RequestParser: " + t.getMessage() );
+ log.error( "Exception in RequestParser: " + t.toString() );
+ log.error( "Payload was: " + payload );
t.printStackTrace();
}
}
@@ -124,6 +114,7 @@ public class NetworkHandler implements Runnable
Thread.currentThread().interrupt();
} finally {
Global.doShutdown = true;
+ sendThread.interrupt();
log.info( "UDP receiver finished." );
}
}
@@ -132,7 +123,7 @@ public class NetworkHandler implements Runnable
* Private sending thread.
* Use blocking queue, wait for packet to be added to it, then try to send.
*/
- static class Sender implements Runnable
+ private class Sender implements Runnable
{
/**
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
index d2cfb21..5741e58 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
@@ -15,15 +15,21 @@ public class RequestParser
/**
* Our very own gson instance (for serializing replies)
*/
- private static final Gson sendGson = new Gson();
+ private final Gson sendGson = new Gson();
+
+ private final Taskmanager taskManager;
+
+ public RequestParser( Taskmanager tm )
+ {
+ this.taskManager = tm;
+ }
/**
* Handle the given unparsed request.
*
- * @param source source of the request, where the reply will be send to (if any)
* @param payload Packet data received from network, already converted to a string
*/
- public static byte[] handle( String payload )
+ public byte[] handle( String payload )
{
String[] parts = payload.split( " *, *", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
@@ -34,21 +40,21 @@ public class RequestParser
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
- TaskStatus status = Taskmanager.getTaskStatus( parts[2] );
+ TaskStatus status = taskManager.getTaskStatus( parts[2] );
return serialize( parts[0], status );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
- Taskmanager.releaseTask( parts[2] );
+ taskManager.releaseTask( parts[2] );
return null;
}
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
- TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] );
+ TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
return serialize( parts[0], status );
}
- private static byte[] serialize( String messageId, TaskStatus status )
+ private byte[] serialize( String messageId, TaskStatus status )
{
String data;
try {