From efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 3 Jun 2014 16:44:56 +0200 Subject: Initial commit --- .gitignore | 9 + api/pom.xml | 66 +++++ .../org/openslx/taskmanager/api/AbstractTask.java | 284 +++++++++++++++++++++ .../openslx/taskmanager/api/SystemCommandTask.java | 169 ++++++++++++ .../org/openslx/taskmanager/api/TaskStatus.java | 130 ++++++++++ daemon/pom.xml | 66 +++++ .../src/main/java/org/openslx/taskmanager/App.java | 47 ++++ .../java/org/openslx/taskmanager/Environment.java | 67 +++++ .../main/java/org/openslx/taskmanager/Global.java | 32 +++ .../org/openslx/taskmanager/main/Taskmanager.java | 200 +++++++++++++++ .../taskmanager/network/NetworkHandler.java | 180 +++++++++++++ .../openslx/taskmanager/network/RequestParser.java | 66 +++++ .../openslx/taskmanager/util/ClassLoaderHack.java | 66 +++++ .../java/org/openslx/taskmanager/util/Util.java | 24 ++ .../test/java/org/openslx/taskmanager/AppTest.java | 38 +++ pom.xml | 38 +++ 16 files changed, 1482 insertions(+) create mode 100644 .gitignore create mode 100644 api/pom.xml create mode 100644 api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java create mode 100644 api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java create mode 100644 api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java create mode 100644 daemon/pom.xml create mode 100644 daemon/src/main/java/org/openslx/taskmanager/App.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/Environment.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/Global.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/util/Util.java create mode 100644 daemon/src/test/java/org/openslx/taskmanager/AppTest.java create mode 100644 pom.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4beb56d --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.project +.settings/ +.classpath +*.swp +*~ +*.tmp +*.class +target/ + diff --git a/api/pom.xml b/api/pom.xml new file mode 100644 index 0000000..48a9beb --- /dev/null +++ b/api/pom.xml @@ -0,0 +1,66 @@ + + 4.0.0 + org.openslx.taskmanager + taskmanager-api + jar + 1.0-SNAPSHOT + taskmanager-api + http://maven.apache.org + + + UTF-8 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + + + maven-assembly-plugin + + + package + + single + + + + + + jar-with-dependencies + + + + + + + + + log4j + log4j + 1.2.17 + compile + + + org.slf4j + slf4j-log4j12 + 1.7.5 + compile + + + com.google.code.gson + gson + 2.2.4 + compile + + + + diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java new file mode 100644 index 0000000..cc837ba --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java @@ -0,0 +1,284 @@ +package org.openslx.taskmanager.api; + +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.api.TaskStatus.StatusCode; + +import com.google.gson.annotations.Expose; + +public abstract class AbstractTask implements Runnable +{ + + private static final long RELEASE_DELAY = 5l * 60l * 1000l; + private static final Logger LOG = Logger.getLogger( AbstractTask.class ); + + /* + * To be set from task invocation (json data) + */ + + /** + * The id of the task instance. + */ + @Expose + private String id = null; + /** + * Parent task. This task won't be started as long as the parent task currently + * waiting for execution or is being executed. Otherwise this task is available for execution. + * Note that MAX_INSTANCES is still being taken into account. Set to null to ignore. + */ + @Expose + private String parentTask = null; + /** + * If the parent task failed to execute, don't run this task and fail immediately. + */ + @Expose + private boolean failOnParentFail = true; + + /* + * Variables we're working with - these should never be set from incoming (json) data + */ + + /** + * Maximum age of a task, if it's not freed explicitly. + */ + private static final long MAX_TASK_AGE = 24l * 3600l * 1000l; + /** + * timeMillis when this task will be removed. This will be set automatically to something + * reasonable like 24 hours, once the job has finished. This is to prevent clogging the task + * manager with finished jobs over time. Note that you can explicitly remove a job using the + * "release" command. + */ + private volatile long removalDeadline = System.currentTimeMillis() + MAX_TASK_AGE; + /** + * True if init() has been called. Task will not start if this is false. + */ + private volatile boolean initDone = false; + /** + * Status of Task + */ + private TaskStatus status = TaskStatus.ts_waiting; + /** + * Reference to parent task + */ + private AbstractTask parent = null; + + /** + * Default constructor which should not be overridden + */ + public AbstractTask() + { + this.id = UUID.randomUUID().toString(); + } + + /* + * Overridable methods + */ + + /** + * Initialize the task; method used by the {@link Taskmanager}. + * Put your own initialization code in initTask() + */ + public final boolean init( AbstractTask parent ) + { + if ( this.initDone ) { + LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() ); + System.exit( 1 ); + } + this.parent = parent; + this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id ); + this.initDone = true; + boolean ret; + try { + ret = this.initTask(); + } catch ( Throwable t ) { + ret = false; + } + if ( !ret ) { + this.status.statusCode = StatusCode.TASK_ERROR; + } + return ret; + } + + /** + * Your own initialization code. This is run synchronously within the network + * handling thread, so do NOT put anything here that might take longer than a few milliseconds! + * You should usually only validate the input to the task here, and return false if it is + * invalid. + * + * @return - true if the task should be executed by the scheduler (ie init was successful) + * - false if init was not successful, or you don't need to do any more processing + */ + protected abstract boolean initTask(); + + /** + * This is where you put your huge work stuff that takes ages. + */ + protected abstract boolean execute(); + + /* + * Final methods + */ + + /** + * Get id of parent task. + * + * @return id of parent task, null if no parent set + * + */ + public String getParentTaskId() + { + return this.parentTask; + } + + /** + * Set the custom status data object to be returned on status requests. + * For simple tasks it should be sufficient to set this once before executing + * the task starts, and then just update fields in that class while the + * task is running. For cases where you have complicated data structures or + * multiple values that need to stay in sync you could create a new instance + * every time, fill it with values, and then call this method. + * + * @param obj the object containing you specific task data + */ + protected final void setStatusObject( Object obj ) + { + status.setStatusObject( obj ); + } + + /** + * Get current status of task. + */ + public final TaskStatus getStatus() + { + if ( this.initDone && this.parentTask != null ) { + final StatusCode parentStatus = parent.getStatusCode(); + switch ( parentStatus ) { + case DUPLICATE_ID: + case NO_SUCH_INSTANCE: + case NO_SUCH_TASK: + case NO_SUCH_CONSTRUCTOR: + case PARENT_FAILED: + case TASK_ERROR: + if ( this.failOnParentFail ) + this.status.statusCode = StatusCode.PARENT_FAILED; + this.parentTask = null; + break; + default: + break; + } + } + return this.status; + } + + /** + * Get status code if task. + * + * @return + */ + public final StatusCode getStatusCode() + { + return getStatus().getStatusCode(); + } + + /** + * Get id of task + */ + public final String getId() + { + return this.id; + } + + /** + * Getter for failOnParentTask + */ + public final boolean getFailOnParentFail() + { + return this.failOnParentFail; + } + + /** + * Release the task: If the task is still pending/running, it will be removed from the task + * manager as soon as it finishes. So you can't query for its result later. If the task has + * already finished (either successful or failed), it will be removed (almost) immediately. + */ + public final void release() + { + this.removalDeadline = Math.min( this.removalDeadline, System.currentTimeMillis() + RELEASE_DELAY ); + } + + /** + * Can this task be started? This checks all the conditions: + * - Has this task been initialized yet? + * - Is it actually waiting for execution? + * - Does it depend on a parent task? + * -- If so, did the parent finish? + * -- Or did it fail? + * --- If so, should this task only run if it didn't fail? + * + * @return true iff this task can be started + */ + public final boolean canStart() + { + if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) { + return false; + } + if ( this.parent == null ) + return true; + return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING; + } + + /** + * Checks whether this task can be removed from the task manager. + * A task can be removed if it is not executing or waiting for execution, and + * its removal deadline has been reached. + * + * @return true if it can be released + */ + public final boolean canBeReleased() + { + if ( this.status.statusCode == StatusCode.TASK_WAITING || this.status.statusCode == StatusCode.TASK_PROCESSING ) + return false; + return this.removalDeadline != 0 && System.currentTimeMillis() > this.removalDeadline; + } + + /** + * Execute the task, wrapped in some sanity checks. + */ + @Override + public final void run() + { + synchronized ( this ) { + if ( this.status.statusCode != StatusCode.TASK_WAITING ) + throw new RuntimeException( "Tried to launch task " + this.getClass().getSimpleName() + " twice!" ); + if ( !this.initDone ) + throw new RuntimeException( "Tried to launch " + this.getClass().getSimpleName() + " without initializing it!" ); + this.status.statusCode = StatusCode.TASK_PROCESSING; + } + boolean ret; + try { + ret = execute(); + } catch ( Throwable t ) { + LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() ); + ret = false; + } + if ( ret ) { + this.status.statusCode = StatusCode.TASK_FINISHED; + } else { + this.status.statusCode = StatusCode.TASK_ERROR; + } + } + + /** + * This is called when a client requests the status of this task. In case you + * want to return complex structures like Lists, which are not thread safe, you + * might want to keep that list outside the status class you return, and only + * create a copy of it for the status class in this function. + * If you only return more or less atomic data, you don't need to override + * this function + */ + protected void updateStatus() + { + + } +} diff --git a/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java new file mode 100644 index 0000000..ea71ea5 --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java @@ -0,0 +1,169 @@ +package org.openslx.taskmanager.api; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +import org.apache.log4j.Logger; + +public abstract class SystemCommandTask extends AbstractTask +{ + + private static final Logger log = Logger.getLogger( SystemCommandTask.class ); + + private String[] command = null; + + private Process process = null; + + + @Override + protected final boolean execute() + { + command = initCommandLine(); + if ( command == null || command.length == 0 ) { + return processEnded( -1 ); + } + + ProcessBuilder pb = new ProcessBuilder( command ); + pb.directory( new File( "/" ) ); + + try { + + // Create process + process = pb.start(); + final Process p = process; + processStarted(); + p.getOutputStream(); + + // Read its stdout + Thread stdout = new Thread( new Runnable() { + @Override + public void run() + { + try { + BufferedReader reader = new BufferedReader( new InputStreamReader( p.getInputStream() ) ); + String line; + while ( ( line = reader.readLine() ) != null ) { + synchronized ( p ) { + processStdOut( line ); + } + } + } catch ( IOException e ) { + } + } + } ); + // Read its stderr + Thread stderr = new Thread( new Runnable() { + @Override + public void run() + { + try { + BufferedReader reader = new BufferedReader( new InputStreamReader( p.getErrorStream() ) ); + String line; + while ( ( line = reader.readLine() ) != null ) { + synchronized ( p ) { + processStdErr( line ); + } + } + } catch ( IOException e ) { + } + } + } ); + + stdout.start(); + stderr.start(); + + // Wait for everything + stdout.join(); + stderr.join(); + process.waitFor(); + + return processEnded( process.exitValue() ); + + } catch ( IOException e ) { + log.warn( "Process of task " + getId() + " died." ); + processStdErr( e.toString() ); + return processEnded( -2 ); + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + return false; + } catch ( Exception e ) { + log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() ); + processStdErr( e.toString() ); + return processEnded( -3 ); + } finally { + if ( process != null ) + process.destroy(); + } + } + + /** + * Write data to the process's stdin. + * + * @param data stuff to write + * @return success or failure mapped to a boolean in a really complicated way + */ + protected final boolean toStdIn(byte[] data) + { + try { + process.getOutputStream().write( data ); + } catch ( IOException e ) { + e.printStackTrace(); + return false; + } + return true; + } + + /** + * Write text to the process's stdin. + * + * @param text stuff to write + * @return success or failure mapped to a boolean in a really complicated way + */ + protected final boolean toStdIn(String text) + { + return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) ); + } + + /** + * Called to get the command line. Each argument should be a separate array + * element. Returning null means the task should not run (as the arguments + * were probably faulty). + * + * @return List of arguments. First element is the command itself. + */ + protected abstract String[] initCommandLine(); + + /** + * Called when the process has been successfully started. + */ + protected void processStarted() + { + } + + /** + * Called when the process has finished running + * + * @param exitCode the process' exit code + * @return + */ + protected abstract boolean processEnded( int exitCode ); + + /** + * Called when a line has been read from the process' stdout. + * + * @param line The line read from the process, without any newline characters + */ + protected abstract void processStdOut( String line ); + + /** + * Called when a line has been read from the process' stderr. + * Trailing newline is removed. + * + * @param line The line read from the process, without any newline characters + */ + protected abstract void processStdErr( String line ); + +} diff --git a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java new file mode 100644 index 0000000..f01ccc4 --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java @@ -0,0 +1,130 @@ +package org.openslx.taskmanager.api; + +/** + * This is what is returned on a status request. + * To return custom data for your task, call {@link AbstractTask#setStatusObject(Object)} from your + * Task. + * This class is serialized entirely, not using the Exposed annotation. + */ +public final class TaskStatus +{ + + public enum StatusCode + { + TASK_WAITING, + TASK_PROCESSING, + TASK_FINISHED, + TASK_ERROR, + NO_SUCH_INSTANCE, + NO_SUCH_TASK, + NO_SUCH_CONSTRUCTOR, + DUPLICATE_ID, + PARENT_FAILED, + JSON_ERROR + } + + /** + * Overall status of the task. Only set by base methods of the AbstractTask class. + */ + protected StatusCode statusCode; + /** + * Custom data a task might want to return on status requests. + */ + private Object data = null; + + @SuppressWarnings( "unused" ) + private final String id; + + /* + * Static members + */ + + /** + * Create a single "duplicate id" status we return if trying to launch a task with an id already + * in use. + */ + public static final TaskStatus ts_duplicateId = new TaskStatus( StatusCode.DUPLICATE_ID ); + /** + * Create a single "no such constructor" status we return if a task could be found, but not + * instantiated. + */ + public static final TaskStatus ts_noSuchConstructor = new TaskStatus( StatusCode.NO_SUCH_CONSTRUCTOR ); + /** + * Create a single "no such task" status we return if a task should be invoked that + * doesn't actually exist. + */ + public static final TaskStatus ts_noSuchTask = new TaskStatus( StatusCode.NO_SUCH_TASK ); + /** + * Create a single "no such task" status we return if an action on a task instance is requested + * that doesn't actually exist. + */ + public static final TaskStatus ts_noSuchInstance = new TaskStatus( StatusCode.NO_SUCH_INSTANCE ); + /** + * Create a single "parent failed" status we return as status for a task which depends on another + * task, and that other task failed to execute, or failed during execution. + */ + public static final TaskStatus ts_parentFailed = new TaskStatus( StatusCode.PARENT_FAILED ); + /** + * Create a single "task waiting" status we return as status for a task that is waiting for + * execution. + */ + public static final TaskStatus ts_waiting = new TaskStatus( StatusCode.TASK_WAITING ); + /** + * Create a single "task error" status we can use everywhere. + */ + public static final TaskStatus ts_error = new TaskStatus( StatusCode.TASK_ERROR ); + /** + * Create a single "json error" status we can use everywhere. + */ + public static final TaskStatus ts_jsonError = new TaskStatus( StatusCode.JSON_ERROR ); + + /** + * Create new TaskStatus with given initial status code + * and id. + * + * @param status The status code to initialize the TaskStatus with + * @param id id of task this status belongs to + */ + public TaskStatus( final StatusCode status, final String id ) + { + this.statusCode = status; + this.id = id; + } + + /** + * Create new TaskStatus with given initial status code. + * + * @param status The status code to initialize the TaskStatus with + */ + public TaskStatus( final StatusCode status ) + { + this( status, null ); + } + + /** + * Get the status code of this TaskStatus + * + * @return + */ + public final StatusCode getStatusCode() + { + return this.statusCode; + } + + /** + * Set the custom status data. + * + * @param obj custom status object + */ + protected void setStatusObject( Object obj ) + { + this.data = obj; + } + + public String getStatusObjectClassName() + { + if ( this.data == null ) return "(null)"; + return this.data.getClass().getSimpleName(); + } + +} diff --git a/daemon/pom.xml b/daemon/pom.xml new file mode 100644 index 0000000..f25d610 --- /dev/null +++ b/daemon/pom.xml @@ -0,0 +1,66 @@ + + 4.0.0 + org.openslx.taskmanager + taskmanager-daemon + jar + 1.0-SNAPSHOT + taskmanager-daemon + http://maven.apache.org + + + UTF-8 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + + + maven-assembly-plugin + + + package + + single + + + + + + + org.openslx.taskmanager.App + + + + jar-with-dependencies + + + + + + + + + junit + junit + 3.8.1 + test + + + org.openslx.taskmanager + taskmanager-api + ${project.version} + compile + + + + + diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java new file mode 100644 index 0000000..c233229 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/App.java @@ -0,0 +1,47 @@ +package org.openslx.taskmanager; + +import java.io.File; +import java.io.IOException; +import java.net.SocketException; + +import junit.runner.ClassPathTestCollector; + +import org.apache.log4j.BasicConfigurator; +import org.openslx.taskmanager.main.Taskmanager; +import org.openslx.taskmanager.network.NetworkHandler; +import org.openslx.taskmanager.util.ClassLoaderHack; + +/** + * Hello world! + * + */ +public class App +{ + + public static void main( String[] args ) throws SocketException + { + // Load all task plugins + File folder = new File( "./plugins" ); + if ( !folder.exists() ) { + System.out.println( "No plugin folder found - nothing to do." ); + System.exit( 1 ); + } + for ( File file : folder.listFiles() ) { + if ( !file.isFile() || !file.toString().endsWith( ".jar" ) ) + continue; + try { + ClassLoaderHack.addFile( file ); + } catch ( IOException e ) { + e.printStackTrace(); + System.out.println( "Could not add plugin: " + file.toString() ); + System.exit( 1 ); + } + } + BasicConfigurator.configure(); + Environment.load( "config/environment" ); + NetworkHandler.init(); + Taskmanager.run(); + // Wait for everything + NetworkHandler.join(); + } +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/Environment.java b/daemon/src/main/java/org/openslx/taskmanager/Environment.java new file mode 100644 index 0000000..acbfad4 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/Environment.java @@ -0,0 +1,67 @@ +package org.openslx.taskmanager; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.log4j.Logger; + +/** + * Holds the environment that tasks running a system command *should* + * use. The environment is read from a config file. + */ +public class Environment +{ + + private static final Logger log = Logger.getLogger( Environment.class ); + + private static Map env = new LinkedHashMap<>(); + + public static boolean load( String fileName ) + { + try { + FileReader fileReader = new FileReader( fileName ); + BufferedReader bufferedReader = new BufferedReader( fileReader ); + + Map env = new LinkedHashMap<>(); + String line = null; + while ( ( line = bufferedReader.readLine() ) != null ) { + if ( !line.matches( "^[a-zA-Z0-9_]+=" ) ) + continue; + String[] part = line.split( "=", 2 ); + env.put( part[0], part[1] ); + } + + bufferedReader.close(); + + Environment.env = env; + log.info( "Loaded " + env.size() + " environment lines." ); + } catch ( IOException e ) { + log.info( "Could not load environment definition from " + fileName + ". Processes might use the same environment as this thread." ); + return false; + } + return true; + } + + public static void set( Map environment ) + { + environment.clear(); + environment.putAll( env ); + } + + public static String[] get() + { + // Get reference to env so it doesn't change while in this function (load() from other thread) + Map env = Environment.env; + String ret[] = new String[ env.size() ]; + int i = 0; + for ( Entry it : env.entrySet() ) { + ret[i++] = it.getKey() + "=" + it.getValue(); + } + return ret; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/Global.java b/daemon/src/main/java/org/openslx/taskmanager/Global.java new file mode 100644 index 0000000..7ca2c2d --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/Global.java @@ -0,0 +1,32 @@ +package org.openslx.taskmanager; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class Global +{ + + public static final int LISTEN_PORT = 9215; + + public static final String TASK_PACKAGE_NAME = "org.openslx.taskmanager.tasks"; + + public static final long MAX_TASK_AGE = 24l * 3600l * 1000l; + + public static final InetAddress LISTEN_ADDRESS; + + public static volatile boolean doShutdown = false; + + static + { + InetAddress la; + try { + la = Inet4Address.getByName( "127.0.0.1" ); + } catch ( UnknownHostException e ) { + la = null; + e.printStackTrace(); + } + LISTEN_ADDRESS = la; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java new file mode 100644 index 0000000..f3707ed --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -0,0 +1,200 @@ +package org.openslx.taskmanager.main; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; +import org.openslx.taskmanager.api.AbstractTask; +import org.openslx.taskmanager.api.TaskStatus; +import org.openslx.taskmanager.util.ClassLoaderHack; +import org.openslx.taskmanager.util.Util; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; + +public class Taskmanager +{ + + private static final Logger log = Logger.getLogger( Taskmanager.class ); + + private static final ExecutorService threadPool = Executors.newCachedThreadPool(); + + /** + * Static gson object for (de)serialization + */ + private static final Gson gson = Util.explicitGsonInstance(); + + /** + * Cache of known tasks + */ + private static final Map> tasks = new ConcurrentHashMap<>(); + + /** + * All the running/finished task instances. The mainloop will call wait() on this and this object + * is notified as soon as the mainloop should check if there is any task available that can be + * run. + */ + private static final Map instances = new ConcurrentHashMap<>(); + + private static final Lock workLock = new ReentrantLock(); + private static final Condition doCheckForWork = workLock.newCondition(); + + /* + * Static methods + */ + + /** + * Return the status of the task with the given ID. If the task does not + * exist, a pseudo-status instance is returned, with a status code of + * NO_SUCH_TASK. This means it is guaranteed that this function never returns + * null. + * + * @param taskId - ID of the task to retrieve the status of + * @return TaskStatus + */ + public static TaskStatus getTaskStatus( final String taskId ) + { + AbstractTask task = instances.get( taskId ); + if ( task == null ) + return TaskStatus.ts_noSuchInstance; + return task.getStatus(); + } + + /** + * Run the requested method as a new task. The json data may contain an explicit id for that + * task, otherwise a random id is generated. If there's already a task running with the desired + * id, an error is returned, and no new task will be created. The desired id has to be added to + * the json data, as a field called "id". + * + * @param task - The task name to be executed. + * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the + * task manager. + * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if + * there is no task registered under the given name. + */ + public static TaskStatus submitTask( final String task, final String jsonData ) + { + // Get task class + Class taskClass; + synchronized ( tasks ) { + taskClass = tasks.get( task ); + if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist + taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class ); + if ( taskClass == null ) { // Simply doesn't exist + log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME ); + return TaskStatus.ts_noSuchTask; + } + tasks.put( task, taskClass ); // Cache for all future calls + } + } + // Instantiate using Gson + final AbstractTask taskInstance; + try { + taskInstance = gson.fromJson( jsonData, taskClass ); + } catch ( JsonSyntaxException e ) { + log.warn( "Invocation request for " + task + " with invalid json: " + jsonData ); + return TaskStatus.ts_jsonError; + } + if ( taskInstance == null ) { + log.warn( task + " exists, but could not be instanciated!" ); + return TaskStatus.ts_noSuchConstructor; + } + if ( taskInstance.getId() == null ) { + log.warn( "Tried to launch " + task + " with null-id" ); + return TaskStatus.ts_noSuchConstructor; + } + // Now check for id collision + synchronized ( instances ) { + if ( instances.containsKey( taskInstance.getId() ) ) { + log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() ); + return TaskStatus.ts_duplicateId; + } + instances.put( taskInstance.getId(), taskInstance ); + } + AbstractTask parent = null; + if ( taskInstance.getParentTaskId() != null ) + parent = instances.get( taskInstance.getParentTaskId() ); + if ( taskInstance.init( parent ) ) { + checkForWork(); + } + return taskInstance.getStatus(); + } + + public static void releaseTask( String taskId ) + { + final AbstractTask task = instances.get( taskId ); + if ( task != null ) + task.release(); + } + + /** + * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances + * is waiting for execution. + */ + protected static void checkForWork() + { + workLock.lock(); + try { + doCheckForWork.signalAll(); + } finally { + workLock.unlock(); + } + } + + public static void run() + { + try { + while ( !Global.doShutdown ) { + workLock.lock(); + try { + doCheckForWork.await( 1, TimeUnit.MINUTES ); + } finally { + workLock.unlock(); + } + try { + for ( Iterator it = instances.values().iterator(); it.hasNext(); ) { + AbstractTask task = it.next(); + if ( task.canBeReleased() ) { + it.remove(); + log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); + continue; + } + if ( task.canStart() ) { + threadPool.execute( task ); + log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); + } + } + } catch ( RejectedExecutionException e ) { + log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + log.info( "Taskmanager mainloop finished." ); + Global.doShutdown = true; + log.info( "Shutting down worker thread pool...." ); + threadPool.shutdown(); + try { + if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) { + log.info( "Thread pool shut down!" ); + } else { + log.info( "Trying to kill still running tasks...." ); + threadPool.shutdownNow(); + } + } catch ( InterruptedException e ) { + log.info( "Interrupted!" ); + } + } + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java new file mode 100644 index 0000000..3e2c8fd --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java @@ -0,0 +1,180 @@ +package org.openslx.taskmanager.network; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandler implements Runnable +{ + + private static final Logger log = Logger.getLogger( NetworkHandler.class ); + + // Static part + + private static Thread recvThread = null; + private static Thread sendThread = null; + /** + * Sender instance (Runnable handling outgoing packets) + */ + private static Sender sender = null; + /** + * UDP socket for sending and receiving. + */ + private static DatagramSocket socket; + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public static void init() throws SocketException + { + if ( recvThread != null ) + throw new RuntimeException( "Already initialized" ); + socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS ); + recvThread = new Thread( new NetworkHandler() ); + recvThread.start(); + sendThread = new Thread( sender = new Sender() ); + sendThread.start(); + } + + public static void shutdown() + { + socket.close(); + } + + public static void join() + { + try { + recvThread.join(); + sendThread.join(); + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } + } + + // Class part + + /** + * Prepare and enqueue reply for client request. + * Only ever to be called from the receiving thread. The reply message is crafted + * and then handed over to the sending thread. + * + * @param destination SocketAddress of the client + * @param messageId The same ID the client used in it's request. + * It's echoed back to the client to enable request bursts, and has no meaning for the + * server. + * @param status A TaskStatus instance to be serialized to json and sent to the client. + */ + private void send( SocketAddress destination, byte[] buffer ) + { + final DatagramPacket packet; + try { + packet = new DatagramPacket( buffer, buffer.length, destination ); + } catch ( SocketException e ) { + log.warn( "Could not construct datagram packet for target " + destination.toString() ); + e.printStackTrace(); + return; + } + sender.send( packet ); + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + byte readBuffer[] = new byte[ 66000 ]; + try { + while ( !Global.doShutdown ) { + DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); + try { + socket.receive( packet ); + } catch ( IOException e ) { + log.info( "IOException on UDP socket when reading: " + e.getMessage() ); + Thread.sleep( 100 ); + continue; + } + if ( packet.getLength() < 2 ) { + log.debug( "Message too short" ); + continue; + } + String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); + try { + byte[] reply = RequestParser.handle( payload ); + if ( reply != null ) + send( packet.getSocketAddress(), reply ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser: " + t.getMessage() ); + t.printStackTrace(); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP receiver finished." ); + } + } + + /** + * Private sending thread. + * Use blocking queue, wait for packet to be added to it, then try to send. + */ + static class Sender implements Runnable + { + + /** + * Queue to stuff outgoing packets into. + */ + private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 ); + + /** + * Wait until something is put into the queue, then send it. + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + final DatagramPacket packet; + packet = queue.take(); + try { + socket.send( packet ); + } catch ( IOException e ) { + log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP sender finished." ); + } + } + + /** + * Add something to the outgoing packet queue. + * Called from the receiving thread. + */ + public void send( DatagramPacket packet ) + { + if ( queue.offer( packet ) ) + return; + log.warn( "Could not add packet to queue: Full" ); + } + + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java new file mode 100644 index 0000000..d2cfb21 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java @@ -0,0 +1,66 @@ +package org.openslx.taskmanager.network; + +import java.nio.charset.StandardCharsets; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.api.TaskStatus; +import org.openslx.taskmanager.main.Taskmanager; + +import com.google.gson.Gson; + +public class RequestParser +{ + private static final Logger log = Logger.getLogger( RequestParser.class ); + + /** + * Our very own gson instance (for serializing replies) + */ + private static final Gson sendGson = new Gson(); + + /** + * Handle the given unparsed request. + * + * @param source source of the request, where the reply will be send to (if any) + * @param payload Packet data received from network, already converted to a string + */ + public static byte[] handle( String payload ) + { + String[] parts = payload.split( " *, *", 3 ); + // Message format is ", , " + if ( parts.length != 3 ) { + log.debug( "Could not split message" ); + return null; + } + // Look at parts[1], if it's "status" it's a request for the task + // with the ID given in parts[2] + if ( parts[1].equals( "status" ) ) { + TaskStatus status = Taskmanager.getTaskStatus( parts[2] ); + return serialize( parts[0], status ); + } + // Now check if parts[1] is "release" + if ( parts[1].equals( "release" ) ) { + Taskmanager.releaseTask( parts[2] ); + return null; + } + // Anything else in parts[0] will be treated as a fresh task invocation, so let's + // pass it on to the task manager. + TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] ); + return serialize( parts[0], status ); + } + + private static byte[] serialize( String messageId, TaskStatus status ) + { + String data; + try { + synchronized ( sendGson ) { + data = sendGson.toJson( status ); + } + } catch ( Throwable e ) { + log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() ); + log.warn( e.toString() ); + return null; + } + return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 ); + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java b/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java new file mode 100644 index 0000000..1a02ff7 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java @@ -0,0 +1,66 @@ +package org.openslx.taskmanager.util; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; + +public class ClassLoaderHack +{ + @SuppressWarnings( "rawtypes" ) + private static final Class[] parameters = new Class[] { URL.class }; + + public static void addFile( String s ) throws IOException + { + File f = new File( s ); + addFile( f ); + } + + public static void addFile( File f ) throws IOException + { + addURL( f.toURI().toURL() ); + } + + public static void addURL( URL u ) throws IOException + { + URLClassLoader sysloader = (URLClassLoader)ClassLoader.getSystemClassLoader(); + Class sysclass = URLClassLoader.class; + + try { + Method method = sysclass.getDeclaredMethod( "addURL", parameters ); + method.setAccessible( true ); + method.invoke( sysloader, new Object[] { u } ); + System.out.println( "Loaded " + u.toString() ); + } catch ( Throwable t ) { + t.printStackTrace(); + throw new IOException( "Error, could not add URL to system classloader" ); + } + + } + + /** + * Get Class meta-object for given class in package. Only return class if it's somehow + * extending from given baseClass. + * + * @param packageName package to search in + * @param className name of class to look for + * @param baseClass class the class in question has to be extended from + * @return class meta object, or null if not found + */ + @SuppressWarnings( "unchecked" ) + public static Class getClass( String packageName, String className, Class baseClass ) + { + final Class clazz; + try { + clazz = Class.forName( packageName + '.' + className, true, ClassLoader.getSystemClassLoader() ); + } catch ( ClassNotFoundException e ) { + return null; + } + if ( clazz == null || ( baseClass != null && !baseClass.isAssignableFrom( clazz ) ) ) { + return null; + } + return (Class)clazz; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/Util.java b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java new file mode 100644 index 0000000..bf52ecb --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java @@ -0,0 +1,24 @@ +package org.openslx.taskmanager.util; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class Util +{ + + private static GsonBuilder gsonBuilder = new GsonBuilder(); + + /** + * Small helper to create a gson instance that will only handle class members with the + * "@Exposed" annotation. Decided against the default of explicitly excluding fields by + * making them transient, as you might easily forget to exclude an important field, which + * can in turn be a security issue. + * + * @return Gson instance + */ + public static Gson explicitGsonInstance() + { + return gsonBuilder.excludeFieldsWithoutExposeAnnotation().create(); + } + +} diff --git a/daemon/src/test/java/org/openslx/taskmanager/AppTest.java b/daemon/src/test/java/org/openslx/taskmanager/AppTest.java new file mode 100644 index 0000000..feac8e7 --- /dev/null +++ b/daemon/src/test/java/org/openslx/taskmanager/AppTest.java @@ -0,0 +1,38 @@ +package org.openslx.taskmanager; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c07ee72 --- /dev/null +++ b/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + org.openslx.taskmanager + taskmanager + 1.0-SNAPSHOT + pom + + taskmanager + http://google.de/ + + + daemon + api + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + + + + + + UTF-8 + + + + -- cgit v1.2.3-55-g7522