From efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f Mon Sep 17 00:00:00 2001
From: Simon Rettberg
Date: Tue, 3 Jun 2014 16:44:56 +0200
Subject: Initial commit
---
.gitignore | 9 +
api/pom.xml | 66 +++++
.../org/openslx/taskmanager/api/AbstractTask.java | 284 +++++++++++++++++++++
.../openslx/taskmanager/api/SystemCommandTask.java | 169 ++++++++++++
.../org/openslx/taskmanager/api/TaskStatus.java | 130 ++++++++++
daemon/pom.xml | 66 +++++
.../src/main/java/org/openslx/taskmanager/App.java | 47 ++++
.../java/org/openslx/taskmanager/Environment.java | 67 +++++
.../main/java/org/openslx/taskmanager/Global.java | 32 +++
.../org/openslx/taskmanager/main/Taskmanager.java | 200 +++++++++++++++
.../taskmanager/network/NetworkHandler.java | 180 +++++++++++++
.../openslx/taskmanager/network/RequestParser.java | 66 +++++
.../openslx/taskmanager/util/ClassLoaderHack.java | 66 +++++
.../java/org/openslx/taskmanager/util/Util.java | 24 ++
.../test/java/org/openslx/taskmanager/AppTest.java | 38 +++
pom.xml | 38 +++
16 files changed, 1482 insertions(+)
create mode 100644 .gitignore
create mode 100644 api/pom.xml
create mode 100644 api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
create mode 100644 api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java
create mode 100644 api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
create mode 100644 daemon/pom.xml
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/App.java
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/Environment.java
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/Global.java
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java
create mode 100644 daemon/src/main/java/org/openslx/taskmanager/util/Util.java
create mode 100644 daemon/src/test/java/org/openslx/taskmanager/AppTest.java
create mode 100644 pom.xml
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4beb56d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,9 @@
+.project
+.settings/
+.classpath
+*.swp
+*~
+*.tmp
+*.class
+target/
+
diff --git a/api/pom.xml b/api/pom.xml
new file mode 100644
index 0000000..48a9beb
--- /dev/null
+++ b/api/pom.xml
@@ -0,0 +1,66 @@
+
+ 4.0.0
+ org.openslx.taskmanager
+ taskmanager-api
+ jar
+ 1.0-SNAPSHOT
+ taskmanager-api
+ http://maven.apache.org
+
+
+ UTF-8
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+
+ 1.7
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
+
+
+ log4j
+ log4j
+ 1.2.17
+ compile
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.5
+ compile
+
+
+ com.google.code.gson
+ gson
+ 2.2.4
+ compile
+
+
+
+
diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
new file mode 100644
index 0000000..cc837ba
--- /dev/null
+++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
@@ -0,0 +1,284 @@
+package org.openslx.taskmanager.api;
+
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.api.TaskStatus.StatusCode;
+
+import com.google.gson.annotations.Expose;
+
+public abstract class AbstractTask implements Runnable
+{
+
+ private static final long RELEASE_DELAY = 5l * 60l * 1000l;
+ private static final Logger LOG = Logger.getLogger( AbstractTask.class );
+
+ /*
+ * To be set from task invocation (json data)
+ */
+
+ /**
+ * The id of the task instance.
+ */
+ @Expose
+ private String id = null;
+ /**
+ * Parent task. This task won't be started as long as the parent task currently
+ * waiting for execution or is being executed. Otherwise this task is available for execution.
+ * Note that MAX_INSTANCES is still being taken into account. Set to null to ignore.
+ */
+ @Expose
+ private String parentTask = null;
+ /**
+ * If the parent task failed to execute, don't run this task and fail immediately.
+ */
+ @Expose
+ private boolean failOnParentFail = true;
+
+ /*
+ * Variables we're working with - these should never be set from incoming (json) data
+ */
+
+ /**
+ * Maximum age of a task, if it's not freed explicitly.
+ */
+ private static final long MAX_TASK_AGE = 24l * 3600l * 1000l;
+ /**
+ * timeMillis when this task will be removed. This will be set automatically to something
+ * reasonable like 24 hours, once the job has finished. This is to prevent clogging the task
+ * manager with finished jobs over time. Note that you can explicitly remove a job using the
+ * "release" command.
+ */
+ private volatile long removalDeadline = System.currentTimeMillis() + MAX_TASK_AGE;
+ /**
+ * True if init() has been called. Task will not start if this is false.
+ */
+ private volatile boolean initDone = false;
+ /**
+ * Status of Task
+ */
+ private TaskStatus status = TaskStatus.ts_waiting;
+ /**
+ * Reference to parent task
+ */
+ private AbstractTask parent = null;
+
+ /**
+ * Default constructor which should not be overridden
+ */
+ public AbstractTask()
+ {
+ this.id = UUID.randomUUID().toString();
+ }
+
+ /*
+ * Overridable methods
+ */
+
+ /**
+ * Initialize the task; method used by the {@link Taskmanager}.
+ * Put your own initialization code in initTask()
+ */
+ public final boolean init( AbstractTask parent )
+ {
+ if ( this.initDone ) {
+ LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() );
+ System.exit( 1 );
+ }
+ this.parent = parent;
+ this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id );
+ this.initDone = true;
+ boolean ret;
+ try {
+ ret = this.initTask();
+ } catch ( Throwable t ) {
+ ret = false;
+ }
+ if ( !ret ) {
+ this.status.statusCode = StatusCode.TASK_ERROR;
+ }
+ return ret;
+ }
+
+ /**
+ * Your own initialization code. This is run synchronously within the network
+ * handling thread, so do NOT put anything here that might take longer than a few milliseconds!
+ * You should usually only validate the input to the task here, and return false if it is
+ * invalid.
+ *
+ * @return - true if the task should be executed by the scheduler (ie init was successful)
+ * - false if init was not successful, or you don't need to do any more processing
+ */
+ protected abstract boolean initTask();
+
+ /**
+ * This is where you put your huge work stuff that takes ages.
+ */
+ protected abstract boolean execute();
+
+ /*
+ * Final methods
+ */
+
+ /**
+ * Get id of parent task.
+ *
+ * @return id of parent task, null if no parent set
+ *
+ */
+ public String getParentTaskId()
+ {
+ return this.parentTask;
+ }
+
+ /**
+ * Set the custom status data object to be returned on status requests.
+ * For simple tasks it should be sufficient to set this once before executing
+ * the task starts, and then just update fields in that class while the
+ * task is running. For cases where you have complicated data structures or
+ * multiple values that need to stay in sync you could create a new instance
+ * every time, fill it with values, and then call this method.
+ *
+ * @param obj the object containing you specific task data
+ */
+ protected final void setStatusObject( Object obj )
+ {
+ status.setStatusObject( obj );
+ }
+
+ /**
+ * Get current status of task.
+ */
+ public final TaskStatus getStatus()
+ {
+ if ( this.initDone && this.parentTask != null ) {
+ final StatusCode parentStatus = parent.getStatusCode();
+ switch ( parentStatus ) {
+ case DUPLICATE_ID:
+ case NO_SUCH_INSTANCE:
+ case NO_SUCH_TASK:
+ case NO_SUCH_CONSTRUCTOR:
+ case PARENT_FAILED:
+ case TASK_ERROR:
+ if ( this.failOnParentFail )
+ this.status.statusCode = StatusCode.PARENT_FAILED;
+ this.parentTask = null;
+ break;
+ default:
+ break;
+ }
+ }
+ return this.status;
+ }
+
+ /**
+ * Get status code if task.
+ *
+ * @return
+ */
+ public final StatusCode getStatusCode()
+ {
+ return getStatus().getStatusCode();
+ }
+
+ /**
+ * Get id of task
+ */
+ public final String getId()
+ {
+ return this.id;
+ }
+
+ /**
+ * Getter for failOnParentTask
+ */
+ public final boolean getFailOnParentFail()
+ {
+ return this.failOnParentFail;
+ }
+
+ /**
+ * Release the task: If the task is still pending/running, it will be removed from the task
+ * manager as soon as it finishes. So you can't query for its result later. If the task has
+ * already finished (either successful or failed), it will be removed (almost) immediately.
+ */
+ public final void release()
+ {
+ this.removalDeadline = Math.min( this.removalDeadline, System.currentTimeMillis() + RELEASE_DELAY );
+ }
+
+ /**
+ * Can this task be started? This checks all the conditions:
+ * - Has this task been initialized yet?
+ * - Is it actually waiting for execution?
+ * - Does it depend on a parent task?
+ * -- If so, did the parent finish?
+ * -- Or did it fail?
+ * --- If so, should this task only run if it didn't fail?
+ *
+ * @return true iff this task can be started
+ */
+ public final boolean canStart()
+ {
+ if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
+ return false;
+ }
+ if ( this.parent == null )
+ return true;
+ return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING;
+ }
+
+ /**
+ * Checks whether this task can be removed from the task manager.
+ * A task can be removed if it is not executing or waiting for execution, and
+ * its removal deadline has been reached.
+ *
+ * @return true if it can be released
+ */
+ public final boolean canBeReleased()
+ {
+ if ( this.status.statusCode == StatusCode.TASK_WAITING || this.status.statusCode == StatusCode.TASK_PROCESSING )
+ return false;
+ return this.removalDeadline != 0 && System.currentTimeMillis() > this.removalDeadline;
+ }
+
+ /**
+ * Execute the task, wrapped in some sanity checks.
+ */
+ @Override
+ public final void run()
+ {
+ synchronized ( this ) {
+ if ( this.status.statusCode != StatusCode.TASK_WAITING )
+ throw new RuntimeException( "Tried to launch task " + this.getClass().getSimpleName() + " twice!" );
+ if ( !this.initDone )
+ throw new RuntimeException( "Tried to launch " + this.getClass().getSimpleName() + " without initializing it!" );
+ this.status.statusCode = StatusCode.TASK_PROCESSING;
+ }
+ boolean ret;
+ try {
+ ret = execute();
+ } catch ( Throwable t ) {
+ LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() );
+ ret = false;
+ }
+ if ( ret ) {
+ this.status.statusCode = StatusCode.TASK_FINISHED;
+ } else {
+ this.status.statusCode = StatusCode.TASK_ERROR;
+ }
+ }
+
+ /**
+ * This is called when a client requests the status of this task. In case you
+ * want to return complex structures like Lists, which are not thread safe, you
+ * might want to keep that list outside the status class you return, and only
+ * create a copy of it for the status class in this function.
+ * If you only return more or less atomic data, you don't need to override
+ * this function
+ */
+ protected void updateStatus()
+ {
+
+ }
+}
diff --git a/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java
new file mode 100644
index 0000000..ea71ea5
--- /dev/null
+++ b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java
@@ -0,0 +1,169 @@
+package org.openslx.taskmanager.api;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.log4j.Logger;
+
+public abstract class SystemCommandTask extends AbstractTask
+{
+
+ private static final Logger log = Logger.getLogger( SystemCommandTask.class );
+
+ private String[] command = null;
+
+ private Process process = null;
+
+
+ @Override
+ protected final boolean execute()
+ {
+ command = initCommandLine();
+ if ( command == null || command.length == 0 ) {
+ return processEnded( -1 );
+ }
+
+ ProcessBuilder pb = new ProcessBuilder( command );
+ pb.directory( new File( "/" ) );
+
+ try {
+
+ // Create process
+ process = pb.start();
+ final Process p = process;
+ processStarted();
+ p.getOutputStream();
+
+ // Read its stdout
+ Thread stdout = new Thread( new Runnable() {
+ @Override
+ public void run()
+ {
+ try {
+ BufferedReader reader = new BufferedReader( new InputStreamReader( p.getInputStream() ) );
+ String line;
+ while ( ( line = reader.readLine() ) != null ) {
+ synchronized ( p ) {
+ processStdOut( line );
+ }
+ }
+ } catch ( IOException e ) {
+ }
+ }
+ } );
+ // Read its stderr
+ Thread stderr = new Thread( new Runnable() {
+ @Override
+ public void run()
+ {
+ try {
+ BufferedReader reader = new BufferedReader( new InputStreamReader( p.getErrorStream() ) );
+ String line;
+ while ( ( line = reader.readLine() ) != null ) {
+ synchronized ( p ) {
+ processStdErr( line );
+ }
+ }
+ } catch ( IOException e ) {
+ }
+ }
+ } );
+
+ stdout.start();
+ stderr.start();
+
+ // Wait for everything
+ stdout.join();
+ stderr.join();
+ process.waitFor();
+
+ return processEnded( process.exitValue() );
+
+ } catch ( IOException e ) {
+ log.warn( "Process of task " + getId() + " died." );
+ processStdErr( e.toString() );
+ return processEnded( -2 );
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch ( Exception e ) {
+ log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
+ processStdErr( e.toString() );
+ return processEnded( -3 );
+ } finally {
+ if ( process != null )
+ process.destroy();
+ }
+ }
+
+ /**
+ * Write data to the process's stdin.
+ *
+ * @param data stuff to write
+ * @return success or failure mapped to a boolean in a really complicated way
+ */
+ protected final boolean toStdIn(byte[] data)
+ {
+ try {
+ process.getOutputStream().write( data );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Write text to the process's stdin.
+ *
+ * @param text stuff to write
+ * @return success or failure mapped to a boolean in a really complicated way
+ */
+ protected final boolean toStdIn(String text)
+ {
+ return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) );
+ }
+
+ /**
+ * Called to get the command line. Each argument should be a separate array
+ * element. Returning null means the task should not run (as the arguments
+ * were probably faulty).
+ *
+ * @return List of arguments. First element is the command itself.
+ */
+ protected abstract String[] initCommandLine();
+
+ /**
+ * Called when the process has been successfully started.
+ */
+ protected void processStarted()
+ {
+ }
+
+ /**
+ * Called when the process has finished running
+ *
+ * @param exitCode the process' exit code
+ * @return
+ */
+ protected abstract boolean processEnded( int exitCode );
+
+ /**
+ * Called when a line has been read from the process' stdout.
+ *
+ * @param line The line read from the process, without any newline characters
+ */
+ protected abstract void processStdOut( String line );
+
+ /**
+ * Called when a line has been read from the process' stderr.
+ * Trailing newline is removed.
+ *
+ * @param line The line read from the process, without any newline characters
+ */
+ protected abstract void processStdErr( String line );
+
+}
diff --git a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
new file mode 100644
index 0000000..f01ccc4
--- /dev/null
+++ b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
@@ -0,0 +1,130 @@
+package org.openslx.taskmanager.api;
+
+/**
+ * This is what is returned on a status request.
+ * To return custom data for your task, call {@link AbstractTask#setStatusObject(Object)} from your
+ * Task.
+ * This class is serialized entirely, not using the Exposed annotation.
+ */
+public final class TaskStatus
+{
+
+ public enum StatusCode
+ {
+ TASK_WAITING,
+ TASK_PROCESSING,
+ TASK_FINISHED,
+ TASK_ERROR,
+ NO_SUCH_INSTANCE,
+ NO_SUCH_TASK,
+ NO_SUCH_CONSTRUCTOR,
+ DUPLICATE_ID,
+ PARENT_FAILED,
+ JSON_ERROR
+ }
+
+ /**
+ * Overall status of the task. Only set by base methods of the AbstractTask class.
+ */
+ protected StatusCode statusCode;
+ /**
+ * Custom data a task might want to return on status requests.
+ */
+ private Object data = null;
+
+ @SuppressWarnings( "unused" )
+ private final String id;
+
+ /*
+ * Static members
+ */
+
+ /**
+ * Create a single "duplicate id" status we return if trying to launch a task with an id already
+ * in use.
+ */
+ public static final TaskStatus ts_duplicateId = new TaskStatus( StatusCode.DUPLICATE_ID );
+ /**
+ * Create a single "no such constructor" status we return if a task could be found, but not
+ * instantiated.
+ */
+ public static final TaskStatus ts_noSuchConstructor = new TaskStatus( StatusCode.NO_SUCH_CONSTRUCTOR );
+ /**
+ * Create a single "no such task" status we return if a task should be invoked that
+ * doesn't actually exist.
+ */
+ public static final TaskStatus ts_noSuchTask = new TaskStatus( StatusCode.NO_SUCH_TASK );
+ /**
+ * Create a single "no such task" status we return if an action on a task instance is requested
+ * that doesn't actually exist.
+ */
+ public static final TaskStatus ts_noSuchInstance = new TaskStatus( StatusCode.NO_SUCH_INSTANCE );
+ /**
+ * Create a single "parent failed" status we return as status for a task which depends on another
+ * task, and that other task failed to execute, or failed during execution.
+ */
+ public static final TaskStatus ts_parentFailed = new TaskStatus( StatusCode.PARENT_FAILED );
+ /**
+ * Create a single "task waiting" status we return as status for a task that is waiting for
+ * execution.
+ */
+ public static final TaskStatus ts_waiting = new TaskStatus( StatusCode.TASK_WAITING );
+ /**
+ * Create a single "task error" status we can use everywhere.
+ */
+ public static final TaskStatus ts_error = new TaskStatus( StatusCode.TASK_ERROR );
+ /**
+ * Create a single "json error" status we can use everywhere.
+ */
+ public static final TaskStatus ts_jsonError = new TaskStatus( StatusCode.JSON_ERROR );
+
+ /**
+ * Create new TaskStatus with given initial status code
+ * and id.
+ *
+ * @param status The status code to initialize the TaskStatus with
+ * @param id id of task this status belongs to
+ */
+ public TaskStatus( final StatusCode status, final String id )
+ {
+ this.statusCode = status;
+ this.id = id;
+ }
+
+ /**
+ * Create new TaskStatus with given initial status code.
+ *
+ * @param status The status code to initialize the TaskStatus with
+ */
+ public TaskStatus( final StatusCode status )
+ {
+ this( status, null );
+ }
+
+ /**
+ * Get the status code of this TaskStatus
+ *
+ * @return
+ */
+ public final StatusCode getStatusCode()
+ {
+ return this.statusCode;
+ }
+
+ /**
+ * Set the custom status data.
+ *
+ * @param obj custom status object
+ */
+ protected void setStatusObject( Object obj )
+ {
+ this.data = obj;
+ }
+
+ public String getStatusObjectClassName()
+ {
+ if ( this.data == null ) return "(null)";
+ return this.data.getClass().getSimpleName();
+ }
+
+}
diff --git a/daemon/pom.xml b/daemon/pom.xml
new file mode 100644
index 0000000..f25d610
--- /dev/null
+++ b/daemon/pom.xml
@@ -0,0 +1,66 @@
+
+ 4.0.0
+ org.openslx.taskmanager
+ taskmanager-daemon
+ jar
+ 1.0-SNAPSHOT
+ taskmanager-daemon
+ http://maven.apache.org
+
+
+ UTF-8
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+
+ 1.7
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+
+ org.openslx.taskmanager.App
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
+
+
+ junit
+ junit
+ 3.8.1
+ test
+
+
+ org.openslx.taskmanager
+ taskmanager-api
+ ${project.version}
+ compile
+
+
+
+
+
diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java
new file mode 100644
index 0000000..c233229
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/App.java
@@ -0,0 +1,47 @@
+package org.openslx.taskmanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketException;
+
+import junit.runner.ClassPathTestCollector;
+
+import org.apache.log4j.BasicConfigurator;
+import org.openslx.taskmanager.main.Taskmanager;
+import org.openslx.taskmanager.network.NetworkHandler;
+import org.openslx.taskmanager.util.ClassLoaderHack;
+
+/**
+ * Hello world!
+ *
+ */
+public class App
+{
+
+ public static void main( String[] args ) throws SocketException
+ {
+ // Load all task plugins
+ File folder = new File( "./plugins" );
+ if ( !folder.exists() ) {
+ System.out.println( "No plugin folder found - nothing to do." );
+ System.exit( 1 );
+ }
+ for ( File file : folder.listFiles() ) {
+ if ( !file.isFile() || !file.toString().endsWith( ".jar" ) )
+ continue;
+ try {
+ ClassLoaderHack.addFile( file );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ System.out.println( "Could not add plugin: " + file.toString() );
+ System.exit( 1 );
+ }
+ }
+ BasicConfigurator.configure();
+ Environment.load( "config/environment" );
+ NetworkHandler.init();
+ Taskmanager.run();
+ // Wait for everything
+ NetworkHandler.join();
+ }
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/Environment.java b/daemon/src/main/java/org/openslx/taskmanager/Environment.java
new file mode 100644
index 0000000..acbfad4
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/Environment.java
@@ -0,0 +1,67 @@
+package org.openslx.taskmanager;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Holds the environment that tasks running a system command *should*
+ * use. The environment is read from a config file.
+ */
+public class Environment
+{
+
+ private static final Logger log = Logger.getLogger( Environment.class );
+
+ private static Map env = new LinkedHashMap<>();
+
+ public static boolean load( String fileName )
+ {
+ try {
+ FileReader fileReader = new FileReader( fileName );
+ BufferedReader bufferedReader = new BufferedReader( fileReader );
+
+ Map env = new LinkedHashMap<>();
+ String line = null;
+ while ( ( line = bufferedReader.readLine() ) != null ) {
+ if ( !line.matches( "^[a-zA-Z0-9_]+=" ) )
+ continue;
+ String[] part = line.split( "=", 2 );
+ env.put( part[0], part[1] );
+ }
+
+ bufferedReader.close();
+
+ Environment.env = env;
+ log.info( "Loaded " + env.size() + " environment lines." );
+ } catch ( IOException e ) {
+ log.info( "Could not load environment definition from " + fileName + ". Processes might use the same environment as this thread." );
+ return false;
+ }
+ return true;
+ }
+
+ public static void set( Map environment )
+ {
+ environment.clear();
+ environment.putAll( env );
+ }
+
+ public static String[] get()
+ {
+ // Get reference to env so it doesn't change while in this function (load() from other thread)
+ Map env = Environment.env;
+ String ret[] = new String[ env.size() ];
+ int i = 0;
+ for ( Entry it : env.entrySet() ) {
+ ret[i++] = it.getKey() + "=" + it.getValue();
+ }
+ return ret;
+ }
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/Global.java b/daemon/src/main/java/org/openslx/taskmanager/Global.java
new file mode 100644
index 0000000..7ca2c2d
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/Global.java
@@ -0,0 +1,32 @@
+package org.openslx.taskmanager;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class Global
+{
+
+ public static final int LISTEN_PORT = 9215;
+
+ public static final String TASK_PACKAGE_NAME = "org.openslx.taskmanager.tasks";
+
+ public static final long MAX_TASK_AGE = 24l * 3600l * 1000l;
+
+ public static final InetAddress LISTEN_ADDRESS;
+
+ public static volatile boolean doShutdown = false;
+
+ static
+ {
+ InetAddress la;
+ try {
+ la = Inet4Address.getByName( "127.0.0.1" );
+ } catch ( UnknownHostException e ) {
+ la = null;
+ e.printStackTrace();
+ }
+ LISTEN_ADDRESS = la;
+ }
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
new file mode 100644
index 0000000..f3707ed
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
@@ -0,0 +1,200 @@
+package org.openslx.taskmanager.main;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.Global;
+import org.openslx.taskmanager.api.AbstractTask;
+import org.openslx.taskmanager.api.TaskStatus;
+import org.openslx.taskmanager.util.ClassLoaderHack;
+import org.openslx.taskmanager.util.Util;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+public class Taskmanager
+{
+
+ private static final Logger log = Logger.getLogger( Taskmanager.class );
+
+ private static final ExecutorService threadPool = Executors.newCachedThreadPool();
+
+ /**
+ * Static gson object for (de)serialization
+ */
+ private static final Gson gson = Util.explicitGsonInstance();
+
+ /**
+ * Cache of known tasks
+ */
+ private static final Map> tasks = new ConcurrentHashMap<>();
+
+ /**
+ * All the running/finished task instances. The mainloop will call wait() on this and this object
+ * is notified as soon as the mainloop should check if there is any task available that can be
+ * run.
+ */
+ private static final Map instances = new ConcurrentHashMap<>();
+
+ private static final Lock workLock = new ReentrantLock();
+ private static final Condition doCheckForWork = workLock.newCondition();
+
+ /*
+ * Static methods
+ */
+
+ /**
+ * Return the status of the task with the given ID. If the task does not
+ * exist, a pseudo-status instance is returned, with a status code of
+ * NO_SUCH_TASK. This means it is guaranteed that this function never returns
+ * null.
+ *
+ * @param taskId - ID of the task to retrieve the status of
+ * @return TaskStatus
+ */
+ public static TaskStatus getTaskStatus( final String taskId )
+ {
+ AbstractTask task = instances.get( taskId );
+ if ( task == null )
+ return TaskStatus.ts_noSuchInstance;
+ return task.getStatus();
+ }
+
+ /**
+ * Run the requested method as a new task. The json data may contain an explicit id for that
+ * task, otherwise a random id is generated. If there's already a task running with the desired
+ * id, an error is returned, and no new task will be created. The desired id has to be added to
+ * the json data, as a field called "id".
+ *
+ * @param task - The task name to be executed.
+ * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the
+ * task manager.
+ * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
+ * there is no task registered under the given name.
+ */
+ public static TaskStatus submitTask( final String task, final String jsonData )
+ {
+ // Get task class
+ Class extends AbstractTask> taskClass;
+ synchronized ( tasks ) {
+ taskClass = tasks.get( task );
+ if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist
+ taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class );
+ if ( taskClass == null ) { // Simply doesn't exist
+ log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME );
+ return TaskStatus.ts_noSuchTask;
+ }
+ tasks.put( task, taskClass ); // Cache for all future calls
+ }
+ }
+ // Instantiate using Gson
+ final AbstractTask taskInstance;
+ try {
+ taskInstance = gson.fromJson( jsonData, taskClass );
+ } catch ( JsonSyntaxException e ) {
+ log.warn( "Invocation request for " + task + " with invalid json: " + jsonData );
+ return TaskStatus.ts_jsonError;
+ }
+ if ( taskInstance == null ) {
+ log.warn( task + " exists, but could not be instanciated!" );
+ return TaskStatus.ts_noSuchConstructor;
+ }
+ if ( taskInstance.getId() == null ) {
+ log.warn( "Tried to launch " + task + " with null-id" );
+ return TaskStatus.ts_noSuchConstructor;
+ }
+ // Now check for id collision
+ synchronized ( instances ) {
+ if ( instances.containsKey( taskInstance.getId() ) ) {
+ log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() );
+ return TaskStatus.ts_duplicateId;
+ }
+ instances.put( taskInstance.getId(), taskInstance );
+ }
+ AbstractTask parent = null;
+ if ( taskInstance.getParentTaskId() != null )
+ parent = instances.get( taskInstance.getParentTaskId() );
+ if ( taskInstance.init( parent ) ) {
+ checkForWork();
+ }
+ return taskInstance.getStatus();
+ }
+
+ public static void releaseTask( String taskId )
+ {
+ final AbstractTask task = instances.get( taskId );
+ if ( task != null )
+ task.release();
+ }
+
+ /**
+ * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
+ * is waiting for execution.
+ */
+ protected static void checkForWork()
+ {
+ workLock.lock();
+ try {
+ doCheckForWork.signalAll();
+ } finally {
+ workLock.unlock();
+ }
+ }
+
+ public static void run()
+ {
+ try {
+ while ( !Global.doShutdown ) {
+ workLock.lock();
+ try {
+ doCheckForWork.await( 1, TimeUnit.MINUTES );
+ } finally {
+ workLock.unlock();
+ }
+ try {
+ for ( Iterator it = instances.values().iterator(); it.hasNext(); ) {
+ AbstractTask task = it.next();
+ if ( task.canBeReleased() ) {
+ it.remove();
+ log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
+ continue;
+ }
+ if ( task.canStart() ) {
+ threadPool.execute( task );
+ log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
+ }
+ }
+ } catch ( RejectedExecutionException e ) {
+ log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" );
+ }
+ }
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ } finally {
+ log.info( "Taskmanager mainloop finished." );
+ Global.doShutdown = true;
+ log.info( "Shutting down worker thread pool...." );
+ threadPool.shutdown();
+ try {
+ if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) {
+ log.info( "Thread pool shut down!" );
+ } else {
+ log.info( "Trying to kill still running tasks...." );
+ threadPool.shutdownNow();
+ }
+ } catch ( InterruptedException e ) {
+ log.info( "Interrupted!" );
+ }
+ }
+ }
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
new file mode 100644
index 0000000..3e2c8fd
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java
@@ -0,0 +1,180 @@
+package org.openslx.taskmanager.network;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.Global;
+
+/**
+ * The network listener that will receive incoming UDP packets, try to process
+ * them, and then send a reply.
+ */
+public class NetworkHandler implements Runnable
+{
+
+ private static final Logger log = Logger.getLogger( NetworkHandler.class );
+
+ // Static part
+
+ private static Thread recvThread = null;
+ private static Thread sendThread = null;
+ /**
+ * Sender instance (Runnable handling outgoing packets)
+ */
+ private static Sender sender = null;
+ /**
+ * UDP socket for sending and receiving.
+ */
+ private static DatagramSocket socket;
+
+ /**
+ * Initialize the NetworkHandler by starting threads and opening the socket.
+ */
+ public static void init() throws SocketException
+ {
+ if ( recvThread != null )
+ throw new RuntimeException( "Already initialized" );
+ socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS );
+ recvThread = new Thread( new NetworkHandler() );
+ recvThread.start();
+ sendThread = new Thread( sender = new Sender() );
+ sendThread.start();
+ }
+
+ public static void shutdown()
+ {
+ socket.close();
+ }
+
+ public static void join()
+ {
+ try {
+ recvThread.join();
+ sendThread.join();
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // Class part
+
+ /**
+ * Prepare and enqueue reply for client request.
+ * Only ever to be called from the receiving thread. The reply message is crafted
+ * and then handed over to the sending thread.
+ *
+ * @param destination SocketAddress of the client
+ * @param messageId The same ID the client used in it's request.
+ * It's echoed back to the client to enable request bursts, and has no meaning for the
+ * server.
+ * @param status A TaskStatus instance to be serialized to json and sent to the client.
+ */
+ private void send( SocketAddress destination, byte[] buffer )
+ {
+ final DatagramPacket packet;
+ try {
+ packet = new DatagramPacket( buffer, buffer.length, destination );
+ } catch ( SocketException e ) {
+ log.warn( "Could not construct datagram packet for target " + destination.toString() );
+ e.printStackTrace();
+ return;
+ }
+ sender.send( packet );
+ }
+
+ /**
+ * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
+ */
+ @Override
+ public void run()
+ {
+ byte readBuffer[] = new byte[ 66000 ];
+ try {
+ while ( !Global.doShutdown ) {
+ DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
+ try {
+ socket.receive( packet );
+ } catch ( IOException e ) {
+ log.info( "IOException on UDP socket when reading: " + e.getMessage() );
+ Thread.sleep( 100 );
+ continue;
+ }
+ if ( packet.getLength() < 2 ) {
+ log.debug( "Message too short" );
+ continue;
+ }
+ String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
+ try {
+ byte[] reply = RequestParser.handle( payload );
+ if ( reply != null )
+ send( packet.getSocketAddress(), reply );
+ } catch ( Throwable t ) {
+ log.error( "Exception in RequestParser: " + t.getMessage() );
+ t.printStackTrace();
+ }
+ }
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ } finally {
+ Global.doShutdown = true;
+ log.info( "UDP receiver finished." );
+ }
+ }
+
+ /**
+ * Private sending thread.
+ * Use blocking queue, wait for packet to be added to it, then try to send.
+ */
+ static class Sender implements Runnable
+ {
+
+ /**
+ * Queue to stuff outgoing packets into.
+ */
+ private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 );
+
+ /**
+ * Wait until something is put into the queue, then send it.
+ */
+ @Override
+ public void run()
+ {
+ try {
+ while ( !Global.doShutdown ) {
+ final DatagramPacket packet;
+ packet = queue.take();
+ try {
+ socket.send( packet );
+ } catch ( IOException e ) {
+ log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() );
+ }
+ }
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ } finally {
+ Global.doShutdown = true;
+ log.info( "UDP sender finished." );
+ }
+ }
+
+ /**
+ * Add something to the outgoing packet queue.
+ * Called from the receiving thread.
+ */
+ public void send( DatagramPacket packet )
+ {
+ if ( queue.offer( packet ) )
+ return;
+ log.warn( "Could not add packet to queue: Full" );
+ }
+
+ }
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
new file mode 100644
index 0000000..d2cfb21
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
@@ -0,0 +1,66 @@
+package org.openslx.taskmanager.network;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.log4j.Logger;
+import org.openslx.taskmanager.api.TaskStatus;
+import org.openslx.taskmanager.main.Taskmanager;
+
+import com.google.gson.Gson;
+
+public class RequestParser
+{
+ private static final Logger log = Logger.getLogger( RequestParser.class );
+
+ /**
+ * Our very own gson instance (for serializing replies)
+ */
+ private static final Gson sendGson = new Gson();
+
+ /**
+ * Handle the given unparsed request.
+ *
+ * @param source source of the request, where the reply will be send to (if any)
+ * @param payload Packet data received from network, already converted to a string
+ */
+ public static byte[] handle( String payload )
+ {
+ String[] parts = payload.split( " *, *", 3 );
+ // Message format is ", , "
+ if ( parts.length != 3 ) {
+ log.debug( "Could not split message" );
+ return null;
+ }
+ // Look at parts[1], if it's "status" it's a request for the task
+ // with the ID given in parts[2]
+ if ( parts[1].equals( "status" ) ) {
+ TaskStatus status = Taskmanager.getTaskStatus( parts[2] );
+ return serialize( parts[0], status );
+ }
+ // Now check if parts[1] is "release"
+ if ( parts[1].equals( "release" ) ) {
+ Taskmanager.releaseTask( parts[2] );
+ return null;
+ }
+ // Anything else in parts[0] will be treated as a fresh task invocation, so let's
+ // pass it on to the task manager.
+ TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] );
+ return serialize( parts[0], status );
+ }
+
+ private static byte[] serialize( String messageId, TaskStatus status )
+ {
+ String data;
+ try {
+ synchronized ( sendGson ) {
+ data = sendGson.toJson( status );
+ }
+ } catch ( Throwable e ) {
+ log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() );
+ log.warn( e.toString() );
+ return null;
+ }
+ return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 );
+ }
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java b/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java
new file mode 100644
index 0000000..1a02ff7
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java
@@ -0,0 +1,66 @@
+package org.openslx.taskmanager.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class ClassLoaderHack
+{
+ @SuppressWarnings( "rawtypes" )
+ private static final Class[] parameters = new Class[] { URL.class };
+
+ public static void addFile( String s ) throws IOException
+ {
+ File f = new File( s );
+ addFile( f );
+ }
+
+ public static void addFile( File f ) throws IOException
+ {
+ addURL( f.toURI().toURL() );
+ }
+
+ public static void addURL( URL u ) throws IOException
+ {
+ URLClassLoader sysloader = (URLClassLoader)ClassLoader.getSystemClassLoader();
+ Class sysclass = URLClassLoader.class;
+
+ try {
+ Method method = sysclass.getDeclaredMethod( "addURL", parameters );
+ method.setAccessible( true );
+ method.invoke( sysloader, new Object[] { u } );
+ System.out.println( "Loaded " + u.toString() );
+ } catch ( Throwable t ) {
+ t.printStackTrace();
+ throw new IOException( "Error, could not add URL to system classloader" );
+ }
+
+ }
+
+ /**
+ * Get Class meta-object for given class in package. Only return class if it's somehow
+ * extending from given baseClass.
+ *
+ * @param packageName package to search in
+ * @param className name of class to look for
+ * @param baseClass class the class in question has to be extended from
+ * @return class meta object, or null if not found
+ */
+ @SuppressWarnings( "unchecked" )
+ public static Class extends T> getClass( String packageName, String className, Class baseClass )
+ {
+ final Class> clazz;
+ try {
+ clazz = Class.forName( packageName + '.' + className, true, ClassLoader.getSystemClassLoader() );
+ } catch ( ClassNotFoundException e ) {
+ return null;
+ }
+ if ( clazz == null || ( baseClass != null && !baseClass.isAssignableFrom( clazz ) ) ) {
+ return null;
+ }
+ return (Class extends T>)clazz;
+ }
+
+}
diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/Util.java b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java
new file mode 100644
index 0000000..bf52ecb
--- /dev/null
+++ b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java
@@ -0,0 +1,24 @@
+package org.openslx.taskmanager.util;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class Util
+{
+
+ private static GsonBuilder gsonBuilder = new GsonBuilder();
+
+ /**
+ * Small helper to create a gson instance that will only handle class members with the
+ * "@Exposed" annotation. Decided against the default of explicitly excluding fields by
+ * making them transient, as you might easily forget to exclude an important field, which
+ * can in turn be a security issue.
+ *
+ * @return Gson instance
+ */
+ public static Gson explicitGsonInstance()
+ {
+ return gsonBuilder.excludeFieldsWithoutExposeAnnotation().create();
+ }
+
+}
diff --git a/daemon/src/test/java/org/openslx/taskmanager/AppTest.java b/daemon/src/test/java/org/openslx/taskmanager/AppTest.java
new file mode 100644
index 0000000..feac8e7
--- /dev/null
+++ b/daemon/src/test/java/org/openslx/taskmanager/AppTest.java
@@ -0,0 +1,38 @@
+package org.openslx.taskmanager;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c07ee72
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,38 @@
+
+
+ 4.0.0
+
+ org.openslx.taskmanager
+ taskmanager
+ 1.0-SNAPSHOT
+ pom
+
+ taskmanager
+ http://google.de/
+
+
+ daemon
+ api
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+
+ 1.7
+
+
+
+
+
+
+ UTF-8
+
+
+
+
--
cgit v1.2.3-55-g7522