From efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 3 Jun 2014 16:44:56 +0200 Subject: Initial commit --- .../org/openslx/taskmanager/main/Taskmanager.java | 200 +++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java (limited to 'daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java') diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java new file mode 100644 index 0000000..f3707ed --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -0,0 +1,200 @@ +package org.openslx.taskmanager.main; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; +import org.openslx.taskmanager.api.AbstractTask; +import org.openslx.taskmanager.api.TaskStatus; +import org.openslx.taskmanager.util.ClassLoaderHack; +import org.openslx.taskmanager.util.Util; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; + +public class Taskmanager +{ + + private static final Logger log = Logger.getLogger( Taskmanager.class ); + + private static final ExecutorService threadPool = Executors.newCachedThreadPool(); + + /** + * Static gson object for (de)serialization + */ + private static final Gson gson = Util.explicitGsonInstance(); + + /** + * Cache of known tasks + */ + private static final Map> tasks = new ConcurrentHashMap<>(); + + /** + * All the running/finished task instances. The mainloop will call wait() on this and this object + * is notified as soon as the mainloop should check if there is any task available that can be + * run. + */ + private static final Map instances = new ConcurrentHashMap<>(); + + private static final Lock workLock = new ReentrantLock(); + private static final Condition doCheckForWork = workLock.newCondition(); + + /* + * Static methods + */ + + /** + * Return the status of the task with the given ID. If the task does not + * exist, a pseudo-status instance is returned, with a status code of + * NO_SUCH_TASK. This means it is guaranteed that this function never returns + * null. + * + * @param taskId - ID of the task to retrieve the status of + * @return TaskStatus + */ + public static TaskStatus getTaskStatus( final String taskId ) + { + AbstractTask task = instances.get( taskId ); + if ( task == null ) + return TaskStatus.ts_noSuchInstance; + return task.getStatus(); + } + + /** + * Run the requested method as a new task. The json data may contain an explicit id for that + * task, otherwise a random id is generated. If there's already a task running with the desired + * id, an error is returned, and no new task will be created. The desired id has to be added to + * the json data, as a field called "id". + * + * @param task - The task name to be executed. + * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the + * task manager. + * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if + * there is no task registered under the given name. + */ + public static TaskStatus submitTask( final String task, final String jsonData ) + { + // Get task class + Class taskClass; + synchronized ( tasks ) { + taskClass = tasks.get( task ); + if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist + taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class ); + if ( taskClass == null ) { // Simply doesn't exist + log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME ); + return TaskStatus.ts_noSuchTask; + } + tasks.put( task, taskClass ); // Cache for all future calls + } + } + // Instantiate using Gson + final AbstractTask taskInstance; + try { + taskInstance = gson.fromJson( jsonData, taskClass ); + } catch ( JsonSyntaxException e ) { + log.warn( "Invocation request for " + task + " with invalid json: " + jsonData ); + return TaskStatus.ts_jsonError; + } + if ( taskInstance == null ) { + log.warn( task + " exists, but could not be instanciated!" ); + return TaskStatus.ts_noSuchConstructor; + } + if ( taskInstance.getId() == null ) { + log.warn( "Tried to launch " + task + " with null-id" ); + return TaskStatus.ts_noSuchConstructor; + } + // Now check for id collision + synchronized ( instances ) { + if ( instances.containsKey( taskInstance.getId() ) ) { + log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() ); + return TaskStatus.ts_duplicateId; + } + instances.put( taskInstance.getId(), taskInstance ); + } + AbstractTask parent = null; + if ( taskInstance.getParentTaskId() != null ) + parent = instances.get( taskInstance.getParentTaskId() ); + if ( taskInstance.init( parent ) ) { + checkForWork(); + } + return taskInstance.getStatus(); + } + + public static void releaseTask( String taskId ) + { + final AbstractTask task = instances.get( taskId ); + if ( task != null ) + task.release(); + } + + /** + * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances + * is waiting for execution. + */ + protected static void checkForWork() + { + workLock.lock(); + try { + doCheckForWork.signalAll(); + } finally { + workLock.unlock(); + } + } + + public static void run() + { + try { + while ( !Global.doShutdown ) { + workLock.lock(); + try { + doCheckForWork.await( 1, TimeUnit.MINUTES ); + } finally { + workLock.unlock(); + } + try { + for ( Iterator it = instances.values().iterator(); it.hasNext(); ) { + AbstractTask task = it.next(); + if ( task.canBeReleased() ) { + it.remove(); + log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); + continue; + } + if ( task.canStart() ) { + threadPool.execute( task ); + log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); + } + } + } catch ( RejectedExecutionException e ) { + log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + log.info( "Taskmanager mainloop finished." ); + Global.doShutdown = true; + log.info( "Shutting down worker thread pool...." ); + threadPool.shutdown(); + try { + if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) { + log.info( "Thread pool shut down!" ); + } else { + log.info( "Trying to kill still running tasks...." ); + threadPool.shutdownNow(); + } + } catch ( InterruptedException e ) { + log.info( "Interrupted!" ); + } + } + } + +} -- cgit v1.2.3-55-g7522