package org.openslx.taskmanager.main;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
import org.openslx.taskmanager.api.CancellableTask;
import org.openslx.taskmanager.api.FinishCallback;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
import org.openslx.taskmanager.util.Util;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
public class Taskmanager implements FinishCallback, Runnable
{
private static final Logger log = Logger.getLogger( Taskmanager.class );
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 16 ) );
/**
* gson object for (de)serialization
*/
private final Gson gson = Util.explicitGsonInstance();
/**
* Cache of known tasks
*/
private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
/**
* All the running/finished task instances.
*/
private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
private final Semaphore doCheckForWork = new Semaphore( 0 );
/*
* Static methods
*/
/**
* Return the status of the task with the given ID. If the task does not
* exist, a pseudo-status instance is returned, with a status code of
* NO_SUCH_TASK. This means it is guaranteed that this function never returns
* null.
*
* @param taskId - ID of the task to retrieve the status of
* @return TaskStatus
*/
public TaskStatus getTaskStatus( final String taskId )
{
AbstractTask task = instances.get( taskId );
if ( task == null )
return TaskStatus.ts_noSuchInstance;
return task.getStatus();
}
/**
* Run the requested method as a new task. The json data may contain an explicit id for that
* task, otherwise a random id is generated. If there's already a task running with the desired
* id, an error is returned, and no new task will be created. The desired id has to be added to
* the json data, as a field called "id".
*
* @param task - The task name to be executed.
* @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the
* task manager.
* @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
* there is no task registered under the given name.
*/
public TaskStatus submitTask( final String task, final String jsonData )
{
// Get task class
Class<? extends AbstractTask> taskClass;
synchronized ( tasks ) {
taskClass = tasks.get( task );
if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist
taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class );
if ( taskClass == null ) { // Simply doesn't exist
log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME );
return TaskStatus.ts_noSuchTask;
}
tasks.put( task, taskClass ); // Cache for all future calls
}
}
// Instantiate using Gson
final AbstractTask taskInstance;
try {
taskInstance = gson.fromJson( jsonData, taskClass );
} catch ( JsonSyntaxException e ) {
log.warn( "Invocation request for " + task + " with invalid json: " + jsonData, e );
return TaskStatus.ts_jsonError;
}
if ( taskInstance == null ) {
log.warn( task + " exists, but could not be instanciated!" );
return TaskStatus.ts_noSuchConstructor;
}
if ( taskInstance.getId() == null || taskInstance.getId().isEmpty() ) {
log.warn( "Tried to launch " + task + " with null-id" );
return TaskStatus.ts_noSuchConstructor;
}
// Now check for id collision
synchronized ( instances ) {
AbstractTask existing = instances.get( taskInstance.getId() );
if ( existing != null && !existing.isFinished() ) {
log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() );
return TaskStatus.ts_duplicateId;
}
instances.put( taskInstance.getId(), taskInstance );
}
AbstractTask parent = null;
if ( taskInstance.getParentTaskId() != null )
parent = instances.get( taskInstance.getParentTaskId() );
taskInstance.init( parent, this );
checkForWork();
return taskInstance.getStatus();
}
public void releaseTask( String taskId )
{
final AbstractTask task = instances.get( taskId );
if ( task != null )
task.release();
}
public TaskStatus cancelTask( String taskId )
{
final AbstractTask task = instances.get( taskId );
if ( task == null )
return TaskStatus.ts_noSuchInstance;
if ( !(task instanceof CancellableTask) )
return TaskStatus.ts_notCancellable;
( (CancellableTask)task ).cancel();
return task.getStatus();
}
/**
* Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
* is waiting for execution.
*/
protected void checkForWork()
{
doCheckForWork.release();
}
@Override
public void taskFinished()
{
checkForWork();
}
@Override
public void run()
{
try {
while ( !Global.doShutdown ) {
doCheckForWork.tryAcquire( 1, TimeUnit.MINUTES );
doCheckForWork.drainPermits();
try {
for ( Iterator<AbstractTask> it = instances.values().iterator(); it.hasNext(); ) {
AbstractTask task = it.next();
if ( task.canBeReleased() ) {
it.remove();
log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
continue;
}
if ( task.canStart() ) {
log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
threadPool.execute( task );
task.markAsStarting();
}
}
} catch ( RejectedExecutionException e ) {
log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" );
}
}
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
} finally {
log.info( "Taskmanager mainloop finished." );
Global.doShutdown = true;
log.info( "Shutting down worker thread pool...." );
threadPool.shutdown();
try {
if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) {
log.info( "Thread pool shut down!" );
} else {
log.info( "Trying to kill still running tasks...." );
threadPool.shutdownNow();
}
} catch ( InterruptedException e ) {
log.info( "Interrupted!" );
}
System.exit( 0 );
}
}
}