summaryrefslogblamecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
blob: f3707ed23a3d9a6c6a5211b1b99fa8291b4440c2 (plain) (tree)







































































































































































































                                                                                                                                                    
package org.openslx.taskmanager.main;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
import org.openslx.taskmanager.util.Util;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;

public class Taskmanager
{

	private static final Logger log = Logger.getLogger( Taskmanager.class );

	private static final ExecutorService threadPool = Executors.newCachedThreadPool();

	/**
	 * Static gson object for (de)serialization
	 */
	private static final Gson gson = Util.explicitGsonInstance();

	/**
	 * Cache of known tasks
	 */
	private static final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();

	/**
	 * All the running/finished task instances. The mainloop will call wait() on this and this object
	 * is notified as soon as the mainloop should check if there is any task available that can be
	 * run.
	 */
	private static final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();

	private static final Lock workLock = new ReentrantLock();
	private static final Condition doCheckForWork = workLock.newCondition();

	/*
	 * Static methods
	 */

	/**
	 * Return the status of the task with the given ID. If the task does not
	 * exist, a pseudo-status instance is returned, with a status code of
	 * NO_SUCH_TASK. This means it is guaranteed that this function never returns
	 * null.
	 * 
	 * @param taskId - ID of the task to retrieve the status of
	 * @return TaskStatus
	 */
	public static TaskStatus getTaskStatus( final String taskId )
	{
		AbstractTask task = instances.get( taskId );
		if ( task == null )
			return TaskStatus.ts_noSuchInstance;
		return task.getStatus();
	}

	/**
	 * Run the requested method as a new task. The json data may contain an explicit id for that
	 * task, otherwise a random id is generated. If there's already a task running with the desired
	 * id, an error is returned, and no new task will be created. The desired id has to be added to
	 * the json data, as a field called "id".
	 * 
	 * @param task - The task name to be executed.
	 * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the
	 *           task manager.
	 * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
	 *         there is no task registered under the given name.
	 */
	public static TaskStatus submitTask( final String task, final String jsonData )
	{
		// Get task class
		Class<? extends AbstractTask> taskClass;
		synchronized ( tasks ) {
			taskClass = tasks.get( task );
			if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist
				taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class );
				if ( taskClass == null ) { // Simply doesn't exist
					log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME );
					return TaskStatus.ts_noSuchTask;
				}
				tasks.put( task, taskClass ); // Cache for all future calls
			}
		}
		// Instantiate using Gson
		final AbstractTask taskInstance;
		try {
			taskInstance = gson.fromJson( jsonData, taskClass );
		} catch ( JsonSyntaxException e ) {
			log.warn( "Invocation request for " + task + " with invalid json: " + jsonData );
			return TaskStatus.ts_jsonError;
		}
		if ( taskInstance == null ) {
			log.warn( task + " exists, but could not be instanciated!" );
			return TaskStatus.ts_noSuchConstructor;
		}
		if ( taskInstance.getId() == null ) {
			log.warn( "Tried to launch " + task + " with null-id" );
			return TaskStatus.ts_noSuchConstructor;
		}
		// Now check for id collision
		synchronized ( instances ) {
			if ( instances.containsKey( taskInstance.getId() ) ) {
				log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() );
				return TaskStatus.ts_duplicateId;
			}
			instances.put( taskInstance.getId(), taskInstance );
		}
		AbstractTask parent = null;
		if ( taskInstance.getParentTaskId() != null )
			parent = instances.get( taskInstance.getParentTaskId() );
		if ( taskInstance.init( parent ) ) {
			checkForWork();
		}
		return taskInstance.getStatus();
	}

	public static void releaseTask( String taskId )
	{
		final AbstractTask task = instances.get( taskId );
		if ( task != null )
			task.release();
	}

	/**
	 * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
	 * is waiting for execution.
	 */
	protected static void checkForWork()
	{
		workLock.lock();
		try {
			doCheckForWork.signalAll();
		} finally {
			workLock.unlock();
		}
	}

	public static void run()
	{
		try {
			while ( !Global.doShutdown ) {
				workLock.lock();
				try {
					doCheckForWork.await( 1, TimeUnit.MINUTES );
				} finally {
					workLock.unlock();
				}
				try {
					for ( Iterator<AbstractTask> it = instances.values().iterator(); it.hasNext(); ) {
						AbstractTask task = it.next();
						if ( task.canBeReleased() ) {
							it.remove();
							log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
							continue;
						}
						if ( task.canStart() ) {
							threadPool.execute( task );
							log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
						}
					}
				} catch ( RejectedExecutionException e ) {
					log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" );
				}
			}
		} catch ( InterruptedException e ) {
			Thread.currentThread().interrupt();
		} finally {
			log.info( "Taskmanager mainloop finished." );
			Global.doShutdown = true;
			log.info( "Shutting down worker thread pool...." );
			threadPool.shutdown();
			try {
				if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) {
					log.info( "Thread pool shut down!" );
				} else {
					log.info( "Trying to kill still running tasks...." );
					threadPool.shutdownNow();
				}
			} catch ( InterruptedException e ) {
				log.info( "Interrupted!" );
			}
		}
	}

}