summaryrefslogtreecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java')
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java200
1 files changed, 200 insertions, 0 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
new file mode 100644
index 0000000..f3707ed
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
@@ -0,0 +1,200 @@
+package org.openslx.taskmanager.main;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.Global;
+import org.openslx.taskmanager.api.AbstractTask;
+import org.openslx.taskmanager.api.TaskStatus;
+import org.openslx.taskmanager.util.ClassLoaderHack;
+import org.openslx.taskmanager.util.Util;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+public class Taskmanager
+{
+
+ private static final Logger log = Logger.getLogger( Taskmanager.class );
+
+ private static final ExecutorService threadPool = Executors.newCachedThreadPool();
+
+ /**
+ * Static gson object for (de)serialization
+ */
+ private static final Gson gson = Util.explicitGsonInstance();
+
+ /**
+ * Cache of known tasks
+ */
+ private static final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
+
+ /**
+ * All the running/finished task instances. The mainloop will call wait() on this and this object
+ * is notified as soon as the mainloop should check if there is any task available that can be
+ * run.
+ */
+ private static final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
+
+ private static final Lock workLock = new ReentrantLock();
+ private static final Condition doCheckForWork = workLock.newCondition();
+
+ /*
+ * Static methods
+ */
+
+ /**
+ * Return the status of the task with the given ID. If the task does not
+ * exist, a pseudo-status instance is returned, with a status code of
+ * NO_SUCH_TASK. This means it is guaranteed that this function never returns
+ * null.
+ *
+ * @param taskId - ID of the task to retrieve the status of
+ * @return TaskStatus
+ */
+ public static TaskStatus getTaskStatus( final String taskId )
+ {
+ AbstractTask task = instances.get( taskId );
+ if ( task == null )
+ return TaskStatus.ts_noSuchInstance;
+ return task.getStatus();
+ }
+
+ /**
+ * Run the requested method as a new task. The json data may contain an explicit id for that
+ * task, otherwise a random id is generated. If there's already a task running with the desired
+ * id, an error is returned, and no new task will be created. The desired id has to be added to
+ * the json data, as a field called "id".
+ *
+ * @param task - The task name to be executed.
+ * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the
+ * task manager.
+ * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
+ * there is no task registered under the given name.
+ */
+ public static TaskStatus submitTask( final String task, final String jsonData )
+ {
+ // Get task class
+ Class<? extends AbstractTask> taskClass;
+ synchronized ( tasks ) {
+ taskClass = tasks.get( task );
+ if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist
+ taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class );
+ if ( taskClass == null ) { // Simply doesn't exist
+ log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME );
+ return TaskStatus.ts_noSuchTask;
+ }
+ tasks.put( task, taskClass ); // Cache for all future calls
+ }
+ }
+ // Instantiate using Gson
+ final AbstractTask taskInstance;
+ try {
+ taskInstance = gson.fromJson( jsonData, taskClass );
+ } catch ( JsonSyntaxException e ) {
+ log.warn( "Invocation request for " + task + " with invalid json: " + jsonData );
+ return TaskStatus.ts_jsonError;
+ }
+ if ( taskInstance == null ) {
+ log.warn( task + " exists, but could not be instanciated!" );
+ return TaskStatus.ts_noSuchConstructor;
+ }
+ if ( taskInstance.getId() == null ) {
+ log.warn( "Tried to launch " + task + " with null-id" );
+ return TaskStatus.ts_noSuchConstructor;
+ }
+ // Now check for id collision
+ synchronized ( instances ) {
+ if ( instances.containsKey( taskInstance.getId() ) ) {
+ log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() );
+ return TaskStatus.ts_duplicateId;
+ }
+ instances.put( taskInstance.getId(), taskInstance );
+ }
+ AbstractTask parent = null;
+ if ( taskInstance.getParentTaskId() != null )
+ parent = instances.get( taskInstance.getParentTaskId() );
+ if ( taskInstance.init( parent ) ) {
+ checkForWork();
+ }
+ return taskInstance.getStatus();
+ }
+
+ public static void releaseTask( String taskId )
+ {
+ final AbstractTask task = instances.get( taskId );
+ if ( task != null )
+ task.release();
+ }
+
+ /**
+ * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
+ * is waiting for execution.
+ */
+ protected static void checkForWork()
+ {
+ workLock.lock();
+ try {
+ doCheckForWork.signalAll();
+ } finally {
+ workLock.unlock();
+ }
+ }
+
+ public static void run()
+ {
+ try {
+ while ( !Global.doShutdown ) {
+ workLock.lock();
+ try {
+ doCheckForWork.await( 1, TimeUnit.MINUTES );
+ } finally {
+ workLock.unlock();
+ }
+ try {
+ for ( Iterator<AbstractTask> it = instances.values().iterator(); it.hasNext(); ) {
+ AbstractTask task = it.next();
+ if ( task.canBeReleased() ) {
+ it.remove();
+ log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
+ continue;
+ }
+ if ( task.canStart() ) {
+ threadPool.execute( task );
+ log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
+ }
+ }
+ } catch ( RejectedExecutionException e ) {
+ log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" );
+ }
+ }
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ } finally {
+ log.info( "Taskmanager mainloop finished." );
+ Global.doShutdown = true;
+ log.info( "Shutting down worker thread pool...." );
+ threadPool.shutdown();
+ try {
+ if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) {
+ log.info( "Thread pool shut down!" );
+ } else {
+ log.info( "Trying to kill still running tasks...." );
+ threadPool.shutdownNow();
+ }
+ } catch ( InterruptedException e ) {
+ log.info( "Interrupted!" );
+ }
+ }
+ }
+
+}