From efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 3 Jun 2014 16:44:56 +0200 Subject: Initial commit --- api/pom.xml | 66 +++++ .../org/openslx/taskmanager/api/AbstractTask.java | 284 +++++++++++++++++++++ .../openslx/taskmanager/api/SystemCommandTask.java | 169 ++++++++++++ .../org/openslx/taskmanager/api/TaskStatus.java | 130 ++++++++++ 4 files changed, 649 insertions(+) create mode 100644 api/pom.xml create mode 100644 api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java create mode 100644 api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java create mode 100644 api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java (limited to 'api') diff --git a/api/pom.xml b/api/pom.xml new file mode 100644 index 0000000..48a9beb --- /dev/null +++ b/api/pom.xml @@ -0,0 +1,66 @@ + + 4.0.0 + org.openslx.taskmanager + taskmanager-api + jar + 1.0-SNAPSHOT + taskmanager-api + http://maven.apache.org + + + UTF-8 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + + + maven-assembly-plugin + + + package + + single + + + + + + jar-with-dependencies + + + + + + + + + log4j + log4j + 1.2.17 + compile + + + org.slf4j + slf4j-log4j12 + 1.7.5 + compile + + + com.google.code.gson + gson + 2.2.4 + compile + + + + diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java new file mode 100644 index 0000000..cc837ba --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java @@ -0,0 +1,284 @@ +package org.openslx.taskmanager.api; + +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.api.TaskStatus.StatusCode; + +import com.google.gson.annotations.Expose; + +public abstract class AbstractTask implements Runnable +{ + + private static final long RELEASE_DELAY = 5l * 60l * 1000l; + private static final Logger LOG = Logger.getLogger( AbstractTask.class ); + + /* + * To be set from task invocation (json data) + */ + + /** + * The id of the task instance. + */ + @Expose + private String id = null; + /** + * Parent task. This task won't be started as long as the parent task currently + * waiting for execution or is being executed. Otherwise this task is available for execution. + * Note that MAX_INSTANCES is still being taken into account. Set to null to ignore. + */ + @Expose + private String parentTask = null; + /** + * If the parent task failed to execute, don't run this task and fail immediately. + */ + @Expose + private boolean failOnParentFail = true; + + /* + * Variables we're working with - these should never be set from incoming (json) data + */ + + /** + * Maximum age of a task, if it's not freed explicitly. + */ + private static final long MAX_TASK_AGE = 24l * 3600l * 1000l; + /** + * timeMillis when this task will be removed. This will be set automatically to something + * reasonable like 24 hours, once the job has finished. This is to prevent clogging the task + * manager with finished jobs over time. Note that you can explicitly remove a job using the + * "release" command. + */ + private volatile long removalDeadline = System.currentTimeMillis() + MAX_TASK_AGE; + /** + * True if init() has been called. Task will not start if this is false. + */ + private volatile boolean initDone = false; + /** + * Status of Task + */ + private TaskStatus status = TaskStatus.ts_waiting; + /** + * Reference to parent task + */ + private AbstractTask parent = null; + + /** + * Default constructor which should not be overridden + */ + public AbstractTask() + { + this.id = UUID.randomUUID().toString(); + } + + /* + * Overridable methods + */ + + /** + * Initialize the task; method used by the {@link Taskmanager}. + * Put your own initialization code in initTask() + */ + public final boolean init( AbstractTask parent ) + { + if ( this.initDone ) { + LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() ); + System.exit( 1 ); + } + this.parent = parent; + this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id ); + this.initDone = true; + boolean ret; + try { + ret = this.initTask(); + } catch ( Throwable t ) { + ret = false; + } + if ( !ret ) { + this.status.statusCode = StatusCode.TASK_ERROR; + } + return ret; + } + + /** + * Your own initialization code. This is run synchronously within the network + * handling thread, so do NOT put anything here that might take longer than a few milliseconds! + * You should usually only validate the input to the task here, and return false if it is + * invalid. + * + * @return - true if the task should be executed by the scheduler (ie init was successful) + * - false if init was not successful, or you don't need to do any more processing + */ + protected abstract boolean initTask(); + + /** + * This is where you put your huge work stuff that takes ages. + */ + protected abstract boolean execute(); + + /* + * Final methods + */ + + /** + * Get id of parent task. + * + * @return id of parent task, null if no parent set + * + */ + public String getParentTaskId() + { + return this.parentTask; + } + + /** + * Set the custom status data object to be returned on status requests. + * For simple tasks it should be sufficient to set this once before executing + * the task starts, and then just update fields in that class while the + * task is running. For cases where you have complicated data structures or + * multiple values that need to stay in sync you could create a new instance + * every time, fill it with values, and then call this method. + * + * @param obj the object containing you specific task data + */ + protected final void setStatusObject( Object obj ) + { + status.setStatusObject( obj ); + } + + /** + * Get current status of task. + */ + public final TaskStatus getStatus() + { + if ( this.initDone && this.parentTask != null ) { + final StatusCode parentStatus = parent.getStatusCode(); + switch ( parentStatus ) { + case DUPLICATE_ID: + case NO_SUCH_INSTANCE: + case NO_SUCH_TASK: + case NO_SUCH_CONSTRUCTOR: + case PARENT_FAILED: + case TASK_ERROR: + if ( this.failOnParentFail ) + this.status.statusCode = StatusCode.PARENT_FAILED; + this.parentTask = null; + break; + default: + break; + } + } + return this.status; + } + + /** + * Get status code if task. + * + * @return + */ + public final StatusCode getStatusCode() + { + return getStatus().getStatusCode(); + } + + /** + * Get id of task + */ + public final String getId() + { + return this.id; + } + + /** + * Getter for failOnParentTask + */ + public final boolean getFailOnParentFail() + { + return this.failOnParentFail; + } + + /** + * Release the task: If the task is still pending/running, it will be removed from the task + * manager as soon as it finishes. So you can't query for its result later. If the task has + * already finished (either successful or failed), it will be removed (almost) immediately. + */ + public final void release() + { + this.removalDeadline = Math.min( this.removalDeadline, System.currentTimeMillis() + RELEASE_DELAY ); + } + + /** + * Can this task be started? This checks all the conditions: + * - Has this task been initialized yet? + * - Is it actually waiting for execution? + * - Does it depend on a parent task? + * -- If so, did the parent finish? + * -- Or did it fail? + * --- If so, should this task only run if it didn't fail? + * + * @return true iff this task can be started + */ + public final boolean canStart() + { + if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) { + return false; + } + if ( this.parent == null ) + return true; + return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING; + } + + /** + * Checks whether this task can be removed from the task manager. + * A task can be removed if it is not executing or waiting for execution, and + * its removal deadline has been reached. + * + * @return true if it can be released + */ + public final boolean canBeReleased() + { + if ( this.status.statusCode == StatusCode.TASK_WAITING || this.status.statusCode == StatusCode.TASK_PROCESSING ) + return false; + return this.removalDeadline != 0 && System.currentTimeMillis() > this.removalDeadline; + } + + /** + * Execute the task, wrapped in some sanity checks. + */ + @Override + public final void run() + { + synchronized ( this ) { + if ( this.status.statusCode != StatusCode.TASK_WAITING ) + throw new RuntimeException( "Tried to launch task " + this.getClass().getSimpleName() + " twice!" ); + if ( !this.initDone ) + throw new RuntimeException( "Tried to launch " + this.getClass().getSimpleName() + " without initializing it!" ); + this.status.statusCode = StatusCode.TASK_PROCESSING; + } + boolean ret; + try { + ret = execute(); + } catch ( Throwable t ) { + LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() ); + ret = false; + } + if ( ret ) { + this.status.statusCode = StatusCode.TASK_FINISHED; + } else { + this.status.statusCode = StatusCode.TASK_ERROR; + } + } + + /** + * This is called when a client requests the status of this task. In case you + * want to return complex structures like Lists, which are not thread safe, you + * might want to keep that list outside the status class you return, and only + * create a copy of it for the status class in this function. + * If you only return more or less atomic data, you don't need to override + * this function + */ + protected void updateStatus() + { + + } +} diff --git a/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java new file mode 100644 index 0000000..ea71ea5 --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java @@ -0,0 +1,169 @@ +package org.openslx.taskmanager.api; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +import org.apache.log4j.Logger; + +public abstract class SystemCommandTask extends AbstractTask +{ + + private static final Logger log = Logger.getLogger( SystemCommandTask.class ); + + private String[] command = null; + + private Process process = null; + + + @Override + protected final boolean execute() + { + command = initCommandLine(); + if ( command == null || command.length == 0 ) { + return processEnded( -1 ); + } + + ProcessBuilder pb = new ProcessBuilder( command ); + pb.directory( new File( "/" ) ); + + try { + + // Create process + process = pb.start(); + final Process p = process; + processStarted(); + p.getOutputStream(); + + // Read its stdout + Thread stdout = new Thread( new Runnable() { + @Override + public void run() + { + try { + BufferedReader reader = new BufferedReader( new InputStreamReader( p.getInputStream() ) ); + String line; + while ( ( line = reader.readLine() ) != null ) { + synchronized ( p ) { + processStdOut( line ); + } + } + } catch ( IOException e ) { + } + } + } ); + // Read its stderr + Thread stderr = new Thread( new Runnable() { + @Override + public void run() + { + try { + BufferedReader reader = new BufferedReader( new InputStreamReader( p.getErrorStream() ) ); + String line; + while ( ( line = reader.readLine() ) != null ) { + synchronized ( p ) { + processStdErr( line ); + } + } + } catch ( IOException e ) { + } + } + } ); + + stdout.start(); + stderr.start(); + + // Wait for everything + stdout.join(); + stderr.join(); + process.waitFor(); + + return processEnded( process.exitValue() ); + + } catch ( IOException e ) { + log.warn( "Process of task " + getId() + " died." ); + processStdErr( e.toString() ); + return processEnded( -2 ); + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + return false; + } catch ( Exception e ) { + log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() ); + processStdErr( e.toString() ); + return processEnded( -3 ); + } finally { + if ( process != null ) + process.destroy(); + } + } + + /** + * Write data to the process's stdin. + * + * @param data stuff to write + * @return success or failure mapped to a boolean in a really complicated way + */ + protected final boolean toStdIn(byte[] data) + { + try { + process.getOutputStream().write( data ); + } catch ( IOException e ) { + e.printStackTrace(); + return false; + } + return true; + } + + /** + * Write text to the process's stdin. + * + * @param text stuff to write + * @return success or failure mapped to a boolean in a really complicated way + */ + protected final boolean toStdIn(String text) + { + return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) ); + } + + /** + * Called to get the command line. Each argument should be a separate array + * element. Returning null means the task should not run (as the arguments + * were probably faulty). + * + * @return List of arguments. First element is the command itself. + */ + protected abstract String[] initCommandLine(); + + /** + * Called when the process has been successfully started. + */ + protected void processStarted() + { + } + + /** + * Called when the process has finished running + * + * @param exitCode the process' exit code + * @return + */ + protected abstract boolean processEnded( int exitCode ); + + /** + * Called when a line has been read from the process' stdout. + * + * @param line The line read from the process, without any newline characters + */ + protected abstract void processStdOut( String line ); + + /** + * Called when a line has been read from the process' stderr. + * Trailing newline is removed. + * + * @param line The line read from the process, without any newline characters + */ + protected abstract void processStdErr( String line ); + +} diff --git a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java new file mode 100644 index 0000000..f01ccc4 --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java @@ -0,0 +1,130 @@ +package org.openslx.taskmanager.api; + +/** + * This is what is returned on a status request. + * To return custom data for your task, call {@link AbstractTask#setStatusObject(Object)} from your + * Task. + * This class is serialized entirely, not using the Exposed annotation. + */ +public final class TaskStatus +{ + + public enum StatusCode + { + TASK_WAITING, + TASK_PROCESSING, + TASK_FINISHED, + TASK_ERROR, + NO_SUCH_INSTANCE, + NO_SUCH_TASK, + NO_SUCH_CONSTRUCTOR, + DUPLICATE_ID, + PARENT_FAILED, + JSON_ERROR + } + + /** + * Overall status of the task. Only set by base methods of the AbstractTask class. + */ + protected StatusCode statusCode; + /** + * Custom data a task might want to return on status requests. + */ + private Object data = null; + + @SuppressWarnings( "unused" ) + private final String id; + + /* + * Static members + */ + + /** + * Create a single "duplicate id" status we return if trying to launch a task with an id already + * in use. + */ + public static final TaskStatus ts_duplicateId = new TaskStatus( StatusCode.DUPLICATE_ID ); + /** + * Create a single "no such constructor" status we return if a task could be found, but not + * instantiated. + */ + public static final TaskStatus ts_noSuchConstructor = new TaskStatus( StatusCode.NO_SUCH_CONSTRUCTOR ); + /** + * Create a single "no such task" status we return if a task should be invoked that + * doesn't actually exist. + */ + public static final TaskStatus ts_noSuchTask = new TaskStatus( StatusCode.NO_SUCH_TASK ); + /** + * Create a single "no such task" status we return if an action on a task instance is requested + * that doesn't actually exist. + */ + public static final TaskStatus ts_noSuchInstance = new TaskStatus( StatusCode.NO_SUCH_INSTANCE ); + /** + * Create a single "parent failed" status we return as status for a task which depends on another + * task, and that other task failed to execute, or failed during execution. + */ + public static final TaskStatus ts_parentFailed = new TaskStatus( StatusCode.PARENT_FAILED ); + /** + * Create a single "task waiting" status we return as status for a task that is waiting for + * execution. + */ + public static final TaskStatus ts_waiting = new TaskStatus( StatusCode.TASK_WAITING ); + /** + * Create a single "task error" status we can use everywhere. + */ + public static final TaskStatus ts_error = new TaskStatus( StatusCode.TASK_ERROR ); + /** + * Create a single "json error" status we can use everywhere. + */ + public static final TaskStatus ts_jsonError = new TaskStatus( StatusCode.JSON_ERROR ); + + /** + * Create new TaskStatus with given initial status code + * and id. + * + * @param status The status code to initialize the TaskStatus with + * @param id id of task this status belongs to + */ + public TaskStatus( final StatusCode status, final String id ) + { + this.statusCode = status; + this.id = id; + } + + /** + * Create new TaskStatus with given initial status code. + * + * @param status The status code to initialize the TaskStatus with + */ + public TaskStatus( final StatusCode status ) + { + this( status, null ); + } + + /** + * Get the status code of this TaskStatus + * + * @return + */ + public final StatusCode getStatusCode() + { + return this.statusCode; + } + + /** + * Set the custom status data. + * + * @param obj custom status object + */ + protected void setStatusObject( Object obj ) + { + this.data = obj; + } + + public String getStatusObjectClassName() + { + if ( this.data == null ) return "(null)"; + return this.data.getClass().getSimpleName(); + } + +} -- cgit v1.2.3-55-g7522