summaryrefslogblamecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
blob: b4df03df60baeab33d844448e569afcd13442822 (plain) (tree)
1
2
3
4
5
6
7
8
9



                                     
                                               
                                              
                                                       
                                      
                                               
                                     



                                                
                                                   
                                                  






                                                    
                                                            



                                                                                
                                                                                                                                                   

           
                                            
           
                                                              



                               
                                                                                                   

           
                                                   
           
                                                                                      
 
                                                                    













                                                                                     
                                                              


















                                                                                                         
                                                                                

























                                                                                                                           
                                                                                       













                                                                                                                              

                                                  


                                                
                                                





                                                                  










                                                                  



                                                                                                    
                                     
         
                                         

         







                                  


                                                      

                                                                                 








                                                                                                                                                    
                                                                                                                                                   
                                                                                   
                                                                              






















                                                                                                          
                                         



                 
package org.openslx.taskmanager.main;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
import org.openslx.taskmanager.api.CancellableTask;
import org.openslx.taskmanager.api.FinishCallback;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
import org.openslx.taskmanager.util.Util;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;

public class Taskmanager implements FinishCallback, Runnable
{

	private static final Logger log = Logger.getLogger( Taskmanager.class );

	private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 16 ) );

	/**
	 * gson object for (de)serialization
	 */
	private final Gson gson = Util.explicitGsonInstance();

	/**
	 * Cache of known tasks
	 */
	private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();

	/**
	 * All the running/finished task instances.
	 */
	private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();

	private final Semaphore doCheckForWork = new Semaphore( 0 );

	/*
	 * Static methods
	 */

	/**
	 * Return the status of the task with the given ID. If the task does not
	 * exist, a pseudo-status instance is returned, with a status code of
	 * NO_SUCH_TASK. This means it is guaranteed that this function never returns
	 * null.
	 * 
	 * @param taskId - ID of the task to retrieve the status of
	 * @return TaskStatus
	 */
	public TaskStatus getTaskStatus( final String taskId )
	{
		AbstractTask task = instances.get( taskId );
		if ( task == null )
			return TaskStatus.ts_noSuchInstance;
		return task.getStatus();
	}

	/**
	 * Run the requested method as a new task. The json data may contain an explicit id for that
	 * task, otherwise a random id is generated. If there's already a task running with the desired
	 * id, an error is returned, and no new task will be created. The desired id has to be added to
	 * the json data, as a field called "id".
	 * 
	 * @param task - The task name to be executed.
	 * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the
	 *           task manager.
	 * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
	 *         there is no task registered under the given name.
	 */
	public TaskStatus submitTask( final String task, final String jsonData )
	{
		// Get task class
		Class<? extends AbstractTask> taskClass;
		synchronized ( tasks ) {
			taskClass = tasks.get( task );
			if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist
				taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class );
				if ( taskClass == null ) { // Simply doesn't exist
					log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME );
					return TaskStatus.ts_noSuchTask;
				}
				tasks.put( task, taskClass ); // Cache for all future calls
			}
		}
		// Instantiate using Gson
		final AbstractTask taskInstance;
		try {
			taskInstance = gson.fromJson( jsonData, taskClass );
		} catch ( JsonSyntaxException e ) {
			log.warn( "Invocation request for " + task + " with invalid json: " + jsonData );
			return TaskStatus.ts_jsonError;
		}
		if ( taskInstance == null ) {
			log.warn( task + " exists, but could not be instanciated!" );
			return TaskStatus.ts_noSuchConstructor;
		}
		if ( taskInstance.getId() == null || taskInstance.getId().isEmpty() ) {
			log.warn( "Tried to launch " + task + " with null-id" );
			return TaskStatus.ts_noSuchConstructor;
		}
		// Now check for id collision
		synchronized ( instances ) {
			if ( instances.containsKey( taskInstance.getId() ) ) {
				log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() );
				return TaskStatus.ts_duplicateId;
			}
			instances.put( taskInstance.getId(), taskInstance );
		}
		AbstractTask parent = null;
		if ( taskInstance.getParentTaskId() != null )
			parent = instances.get( taskInstance.getParentTaskId() );
		taskInstance.init( parent, this );
		checkForWork();
		return taskInstance.getStatus();
	}

	public void releaseTask( String taskId )
	{
		final AbstractTask task = instances.get( taskId );
		if ( task != null )
			task.release();
	}

	public TaskStatus cancelTask( String taskId )
	{
		final AbstractTask task = instances.get( taskId );
		if ( task == null )
			return TaskStatus.ts_noSuchInstance;
		if ( !(task instanceof CancellableTask) )
			return TaskStatus.ts_notCancellable;
		( (CancellableTask)task ).cancel();
		return task.getStatus();
	}

	/**
	 * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
	 * is waiting for execution.
	 */
	protected void checkForWork()
	{
		doCheckForWork.release();
	}

	@Override
	public void taskFinished()
	{
		checkForWork();
	}

	@Override
	public void run()
	{
		try {
			while ( !Global.doShutdown ) {
				doCheckForWork.tryAcquire( 1, TimeUnit.MINUTES );
				doCheckForWork.drainPermits();
				try {
					for ( Iterator<AbstractTask> it = instances.values().iterator(); it.hasNext(); ) {
						AbstractTask task = it.next();
						if ( task.canBeReleased() ) {
							it.remove();
							log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
							continue;
						}
						if ( task.canStart() ) {
							log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
							threadPool.execute( task );
							task.markAsStarting();
						}
					}
				} catch ( RejectedExecutionException e ) {
					log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" );
				}
			}
		} catch ( InterruptedException e ) {
			Thread.currentThread().interrupt();
		} finally {
			log.info( "Taskmanager mainloop finished." );
			Global.doShutdown = true;
			log.info( "Shutting down worker thread pool...." );
			threadPool.shutdown();
			try {
				if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) {
					log.info( "Thread pool shut down!" );
				} else {
					log.info( "Trying to kill still running tasks...." );
					threadPool.shutdownNow();
				}
			} catch ( InterruptedException e ) {
				log.info( "Interrupted!" );
			}
			System.exit( 0 );
		}
	}

}