package org.openslx.taskmanager.main; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; import org.openslx.taskmanager.api.AbstractTask; import org.openslx.taskmanager.api.CancellableTask; import org.openslx.taskmanager.api.FinishCallback; import org.openslx.taskmanager.api.TaskStatus; import org.openslx.taskmanager.util.ClassLoaderHack; import org.openslx.taskmanager.util.Util; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; public class Taskmanager implements FinishCallback, Runnable { private static final Logger log = Logger.getLogger( Taskmanager.class ); private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue( 16 ) ); /** * gson object for (de)serialization */ private final Gson gson = Util.explicitGsonInstance(); /** * Cache of known tasks */ private final Map> tasks = new ConcurrentHashMap<>(); /** * All the running/finished task instances. */ private final Map instances = new ConcurrentHashMap<>(); private final Semaphore doCheckForWork = new Semaphore( 0 ); /* * Static methods */ /** * Return the status of the task with the given ID. If the task does not * exist, a pseudo-status instance is returned, with a status code of * NO_SUCH_TASK. This means it is guaranteed that this function never returns * null. * * @param taskId - ID of the task to retrieve the status of * @return TaskStatus */ public TaskStatus getTaskStatus( final String taskId ) { AbstractTask task = instances.get( taskId ); if ( task == null ) return TaskStatus.ts_noSuchInstance; return task.getStatus(); } /** * Run the requested method as a new task. The json data may contain an explicit id for that * task, otherwise a random id is generated. If there's already a task running with the desired * id, an error is returned, and no new task will be created. The desired id has to be added to * the json data, as a field called "id". * * @param task - The task name to be executed. * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the * task manager. * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if * there is no task registered under the given name. */ public TaskStatus submitTask( final String task, final String jsonData ) { // Get task class Class taskClass; synchronized ( tasks ) { taskClass = tasks.get( task ); if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class ); if ( taskClass == null ) { // Simply doesn't exist log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME ); return TaskStatus.ts_noSuchTask; } tasks.put( task, taskClass ); // Cache for all future calls } } // Instantiate using Gson final AbstractTask taskInstance; try { taskInstance = gson.fromJson( jsonData, taskClass ); } catch ( JsonSyntaxException e ) { log.warn( "Invocation request for " + task + " with invalid json: " + jsonData ); return TaskStatus.ts_jsonError; } if ( taskInstance == null ) { log.warn( task + " exists, but could not be instanciated!" ); return TaskStatus.ts_noSuchConstructor; } if ( taskInstance.getId() == null || taskInstance.getId().isEmpty() ) { log.warn( "Tried to launch " + task + " with null-id" ); return TaskStatus.ts_noSuchConstructor; } // Now check for id collision synchronized ( instances ) { if ( instances.containsKey( taskInstance.getId() ) ) { log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() ); return TaskStatus.ts_duplicateId; } instances.put( taskInstance.getId(), taskInstance ); } AbstractTask parent = null; if ( taskInstance.getParentTaskId() != null ) parent = instances.get( taskInstance.getParentTaskId() ); taskInstance.init( parent, this ); checkForWork(); return taskInstance.getStatus(); } public void releaseTask( String taskId ) { final AbstractTask task = instances.get( taskId ); if ( task != null ) task.release(); } public TaskStatus cancelTask( String taskId ) { final AbstractTask task = instances.get( taskId ); if ( task == null ) return TaskStatus.ts_noSuchInstance; if ( !(task instanceof CancellableTask) ) return TaskStatus.ts_notCancellable; ( (CancellableTask)task ).cancel(); return task.getStatus(); } /** * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances * is waiting for execution. */ protected void checkForWork() { doCheckForWork.release(); } @Override public void taskFinished() { checkForWork(); } @Override public void run() { try { while ( !Global.doShutdown ) { doCheckForWork.tryAcquire( 1, TimeUnit.MINUTES ); doCheckForWork.drainPermits(); try { for ( Iterator it = instances.values().iterator(); it.hasNext(); ) { AbstractTask task = it.next(); if ( task.canBeReleased() ) { it.remove(); log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); continue; } if ( task.canStart() ) { log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); threadPool.execute( task ); task.markAsStarting(); } } } catch ( RejectedExecutionException e ) { log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" ); } } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } finally { log.info( "Taskmanager mainloop finished." ); Global.doShutdown = true; log.info( "Shutting down worker thread pool...." ); threadPool.shutdown(); try { if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) { log.info( "Thread pool shut down!" ); } else { log.info( "Trying to kill still running tasks...." ); threadPool.shutdownNow(); } } catch ( InterruptedException e ) { log.info( "Interrupted!" ); } System.exit( 0 ); } } }