From efb5ad9f5fe48a77b6cd14e7bd2b25e3b13ecb1f Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 3 Jun 2014 16:44:56 +0200 Subject: Initial commit --- daemon/pom.xml | 66 +++++++ .../src/main/java/org/openslx/taskmanager/App.java | 47 +++++ .../java/org/openslx/taskmanager/Environment.java | 67 +++++++ .../main/java/org/openslx/taskmanager/Global.java | 32 ++++ .../org/openslx/taskmanager/main/Taskmanager.java | 200 +++++++++++++++++++++ .../taskmanager/network/NetworkHandler.java | 180 +++++++++++++++++++ .../openslx/taskmanager/network/RequestParser.java | 66 +++++++ .../openslx/taskmanager/util/ClassLoaderHack.java | 66 +++++++ .../java/org/openslx/taskmanager/util/Util.java | 24 +++ .../test/java/org/openslx/taskmanager/AppTest.java | 38 ++++ 10 files changed, 786 insertions(+) create mode 100644 daemon/pom.xml create mode 100644 daemon/src/main/java/org/openslx/taskmanager/App.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/Environment.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/Global.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java create mode 100644 daemon/src/main/java/org/openslx/taskmanager/util/Util.java create mode 100644 daemon/src/test/java/org/openslx/taskmanager/AppTest.java (limited to 'daemon') diff --git a/daemon/pom.xml b/daemon/pom.xml new file mode 100644 index 0000000..f25d610 --- /dev/null +++ b/daemon/pom.xml @@ -0,0 +1,66 @@ + + 4.0.0 + org.openslx.taskmanager + taskmanager-daemon + jar + 1.0-SNAPSHOT + taskmanager-daemon + http://maven.apache.org + + + UTF-8 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + + + maven-assembly-plugin + + + package + + single + + + + + + + org.openslx.taskmanager.App + + + + jar-with-dependencies + + + + + + + + + junit + junit + 3.8.1 + test + + + org.openslx.taskmanager + taskmanager-api + ${project.version} + compile + + + + + diff --git a/daemon/src/main/java/org/openslx/taskmanager/App.java b/daemon/src/main/java/org/openslx/taskmanager/App.java new file mode 100644 index 0000000..c233229 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/App.java @@ -0,0 +1,47 @@ +package org.openslx.taskmanager; + +import java.io.File; +import java.io.IOException; +import java.net.SocketException; + +import junit.runner.ClassPathTestCollector; + +import org.apache.log4j.BasicConfigurator; +import org.openslx.taskmanager.main.Taskmanager; +import org.openslx.taskmanager.network.NetworkHandler; +import org.openslx.taskmanager.util.ClassLoaderHack; + +/** + * Hello world! + * + */ +public class App +{ + + public static void main( String[] args ) throws SocketException + { + // Load all task plugins + File folder = new File( "./plugins" ); + if ( !folder.exists() ) { + System.out.println( "No plugin folder found - nothing to do." ); + System.exit( 1 ); + } + for ( File file : folder.listFiles() ) { + if ( !file.isFile() || !file.toString().endsWith( ".jar" ) ) + continue; + try { + ClassLoaderHack.addFile( file ); + } catch ( IOException e ) { + e.printStackTrace(); + System.out.println( "Could not add plugin: " + file.toString() ); + System.exit( 1 ); + } + } + BasicConfigurator.configure(); + Environment.load( "config/environment" ); + NetworkHandler.init(); + Taskmanager.run(); + // Wait for everything + NetworkHandler.join(); + } +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/Environment.java b/daemon/src/main/java/org/openslx/taskmanager/Environment.java new file mode 100644 index 0000000..acbfad4 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/Environment.java @@ -0,0 +1,67 @@ +package org.openslx.taskmanager; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.log4j.Logger; + +/** + * Holds the environment that tasks running a system command *should* + * use. The environment is read from a config file. + */ +public class Environment +{ + + private static final Logger log = Logger.getLogger( Environment.class ); + + private static Map env = new LinkedHashMap<>(); + + public static boolean load( String fileName ) + { + try { + FileReader fileReader = new FileReader( fileName ); + BufferedReader bufferedReader = new BufferedReader( fileReader ); + + Map env = new LinkedHashMap<>(); + String line = null; + while ( ( line = bufferedReader.readLine() ) != null ) { + if ( !line.matches( "^[a-zA-Z0-9_]+=" ) ) + continue; + String[] part = line.split( "=", 2 ); + env.put( part[0], part[1] ); + } + + bufferedReader.close(); + + Environment.env = env; + log.info( "Loaded " + env.size() + " environment lines." ); + } catch ( IOException e ) { + log.info( "Could not load environment definition from " + fileName + ". Processes might use the same environment as this thread." ); + return false; + } + return true; + } + + public static void set( Map environment ) + { + environment.clear(); + environment.putAll( env ); + } + + public static String[] get() + { + // Get reference to env so it doesn't change while in this function (load() from other thread) + Map env = Environment.env; + String ret[] = new String[ env.size() ]; + int i = 0; + for ( Entry it : env.entrySet() ) { + ret[i++] = it.getKey() + "=" + it.getValue(); + } + return ret; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/Global.java b/daemon/src/main/java/org/openslx/taskmanager/Global.java new file mode 100644 index 0000000..7ca2c2d --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/Global.java @@ -0,0 +1,32 @@ +package org.openslx.taskmanager; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class Global +{ + + public static final int LISTEN_PORT = 9215; + + public static final String TASK_PACKAGE_NAME = "org.openslx.taskmanager.tasks"; + + public static final long MAX_TASK_AGE = 24l * 3600l * 1000l; + + public static final InetAddress LISTEN_ADDRESS; + + public static volatile boolean doShutdown = false; + + static + { + InetAddress la; + try { + la = Inet4Address.getByName( "127.0.0.1" ); + } catch ( UnknownHostException e ) { + la = null; + e.printStackTrace(); + } + LISTEN_ADDRESS = la; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java new file mode 100644 index 0000000..f3707ed --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -0,0 +1,200 @@ +package org.openslx.taskmanager.main; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; +import org.openslx.taskmanager.api.AbstractTask; +import org.openslx.taskmanager.api.TaskStatus; +import org.openslx.taskmanager.util.ClassLoaderHack; +import org.openslx.taskmanager.util.Util; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; + +public class Taskmanager +{ + + private static final Logger log = Logger.getLogger( Taskmanager.class ); + + private static final ExecutorService threadPool = Executors.newCachedThreadPool(); + + /** + * Static gson object for (de)serialization + */ + private static final Gson gson = Util.explicitGsonInstance(); + + /** + * Cache of known tasks + */ + private static final Map> tasks = new ConcurrentHashMap<>(); + + /** + * All the running/finished task instances. The mainloop will call wait() on this and this object + * is notified as soon as the mainloop should check if there is any task available that can be + * run. + */ + private static final Map instances = new ConcurrentHashMap<>(); + + private static final Lock workLock = new ReentrantLock(); + private static final Condition doCheckForWork = workLock.newCondition(); + + /* + * Static methods + */ + + /** + * Return the status of the task with the given ID. If the task does not + * exist, a pseudo-status instance is returned, with a status code of + * NO_SUCH_TASK. This means it is guaranteed that this function never returns + * null. + * + * @param taskId - ID of the task to retrieve the status of + * @return TaskStatus + */ + public static TaskStatus getTaskStatus( final String taskId ) + { + AbstractTask task = instances.get( taskId ); + if ( task == null ) + return TaskStatus.ts_noSuchInstance; + return task.getStatus(); + } + + /** + * Run the requested method as a new task. The json data may contain an explicit id for that + * task, otherwise a random id is generated. If there's already a task running with the desired + * id, an error is returned, and no new task will be created. The desired id has to be added to + * the json data, as a field called "id". + * + * @param task - The task name to be executed. + * @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the + * task manager. + * @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if + * there is no task registered under the given name. + */ + public static TaskStatus submitTask( final String task, final String jsonData ) + { + // Get task class + Class taskClass; + synchronized ( tasks ) { + taskClass = tasks.get( task ); + if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist + taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class ); + if ( taskClass == null ) { // Simply doesn't exist + log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME ); + return TaskStatus.ts_noSuchTask; + } + tasks.put( task, taskClass ); // Cache for all future calls + } + } + // Instantiate using Gson + final AbstractTask taskInstance; + try { + taskInstance = gson.fromJson( jsonData, taskClass ); + } catch ( JsonSyntaxException e ) { + log.warn( "Invocation request for " + task + " with invalid json: " + jsonData ); + return TaskStatus.ts_jsonError; + } + if ( taskInstance == null ) { + log.warn( task + " exists, but could not be instanciated!" ); + return TaskStatus.ts_noSuchConstructor; + } + if ( taskInstance.getId() == null ) { + log.warn( "Tried to launch " + task + " with null-id" ); + return TaskStatus.ts_noSuchConstructor; + } + // Now check for id collision + synchronized ( instances ) { + if ( instances.containsKey( taskInstance.getId() ) ) { + log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() ); + return TaskStatus.ts_duplicateId; + } + instances.put( taskInstance.getId(), taskInstance ); + } + AbstractTask parent = null; + if ( taskInstance.getParentTaskId() != null ) + parent = instances.get( taskInstance.getParentTaskId() ); + if ( taskInstance.init( parent ) ) { + checkForWork(); + } + return taskInstance.getStatus(); + } + + public static void releaseTask( String taskId ) + { + final AbstractTask task = instances.get( taskId ); + if ( task != null ) + task.release(); + } + + /** + * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances + * is waiting for execution. + */ + protected static void checkForWork() + { + workLock.lock(); + try { + doCheckForWork.signalAll(); + } finally { + workLock.unlock(); + } + } + + public static void run() + { + try { + while ( !Global.doShutdown ) { + workLock.lock(); + try { + doCheckForWork.await( 1, TimeUnit.MINUTES ); + } finally { + workLock.unlock(); + } + try { + for ( Iterator it = instances.values().iterator(); it.hasNext(); ) { + AbstractTask task = it.next(); + if ( task.canBeReleased() ) { + it.remove(); + log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); + continue; + } + if ( task.canStart() ) { + threadPool.execute( task ); + log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); + } + } + } catch ( RejectedExecutionException e ) { + log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + log.info( "Taskmanager mainloop finished." ); + Global.doShutdown = true; + log.info( "Shutting down worker thread pool...." ); + threadPool.shutdown(); + try { + if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) { + log.info( "Thread pool shut down!" ); + } else { + log.info( "Trying to kill still running tasks...." ); + threadPool.shutdownNow(); + } + } catch ( InterruptedException e ) { + log.info( "Interrupted!" ); + } + } + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java new file mode 100644 index 0000000..3e2c8fd --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/NetworkHandler.java @@ -0,0 +1,180 @@ +package org.openslx.taskmanager.network; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.Global; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandler implements Runnable +{ + + private static final Logger log = Logger.getLogger( NetworkHandler.class ); + + // Static part + + private static Thread recvThread = null; + private static Thread sendThread = null; + /** + * Sender instance (Runnable handling outgoing packets) + */ + private static Sender sender = null; + /** + * UDP socket for sending and receiving. + */ + private static DatagramSocket socket; + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public static void init() throws SocketException + { + if ( recvThread != null ) + throw new RuntimeException( "Already initialized" ); + socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS ); + recvThread = new Thread( new NetworkHandler() ); + recvThread.start(); + sendThread = new Thread( sender = new Sender() ); + sendThread.start(); + } + + public static void shutdown() + { + socket.close(); + } + + public static void join() + { + try { + recvThread.join(); + sendThread.join(); + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } + } + + // Class part + + /** + * Prepare and enqueue reply for client request. + * Only ever to be called from the receiving thread. The reply message is crafted + * and then handed over to the sending thread. + * + * @param destination SocketAddress of the client + * @param messageId The same ID the client used in it's request. + * It's echoed back to the client to enable request bursts, and has no meaning for the + * server. + * @param status A TaskStatus instance to be serialized to json and sent to the client. + */ + private void send( SocketAddress destination, byte[] buffer ) + { + final DatagramPacket packet; + try { + packet = new DatagramPacket( buffer, buffer.length, destination ); + } catch ( SocketException e ) { + log.warn( "Could not construct datagram packet for target " + destination.toString() ); + e.printStackTrace(); + return; + } + sender.send( packet ); + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + byte readBuffer[] = new byte[ 66000 ]; + try { + while ( !Global.doShutdown ) { + DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); + try { + socket.receive( packet ); + } catch ( IOException e ) { + log.info( "IOException on UDP socket when reading: " + e.getMessage() ); + Thread.sleep( 100 ); + continue; + } + if ( packet.getLength() < 2 ) { + log.debug( "Message too short" ); + continue; + } + String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); + try { + byte[] reply = RequestParser.handle( payload ); + if ( reply != null ) + send( packet.getSocketAddress(), reply ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser: " + t.getMessage() ); + t.printStackTrace(); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP receiver finished." ); + } + } + + /** + * Private sending thread. + * Use blocking queue, wait for packet to be added to it, then try to send. + */ + static class Sender implements Runnable + { + + /** + * Queue to stuff outgoing packets into. + */ + private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 ); + + /** + * Wait until something is put into the queue, then send it. + */ + @Override + public void run() + { + try { + while ( !Global.doShutdown ) { + final DatagramPacket packet; + packet = queue.take(); + try { + socket.send( packet ); + } catch ( IOException e ) { + log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + Global.doShutdown = true; + log.info( "UDP sender finished." ); + } + } + + /** + * Add something to the outgoing packet queue. + * Called from the receiving thread. + */ + public void send( DatagramPacket packet ) + { + if ( queue.offer( packet ) ) + return; + log.warn( "Could not add packet to queue: Full" ); + } + + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java new file mode 100644 index 0000000..d2cfb21 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java @@ -0,0 +1,66 @@ +package org.openslx.taskmanager.network; + +import java.nio.charset.StandardCharsets; + +import org.apache.log4j.Logger; +import org.openslx.taskmanager.api.TaskStatus; +import org.openslx.taskmanager.main.Taskmanager; + +import com.google.gson.Gson; + +public class RequestParser +{ + private static final Logger log = Logger.getLogger( RequestParser.class ); + + /** + * Our very own gson instance (for serializing replies) + */ + private static final Gson sendGson = new Gson(); + + /** + * Handle the given unparsed request. + * + * @param source source of the request, where the reply will be send to (if any) + * @param payload Packet data received from network, already converted to a string + */ + public static byte[] handle( String payload ) + { + String[] parts = payload.split( " *, *", 3 ); + // Message format is ", , " + if ( parts.length != 3 ) { + log.debug( "Could not split message" ); + return null; + } + // Look at parts[1], if it's "status" it's a request for the task + // with the ID given in parts[2] + if ( parts[1].equals( "status" ) ) { + TaskStatus status = Taskmanager.getTaskStatus( parts[2] ); + return serialize( parts[0], status ); + } + // Now check if parts[1] is "release" + if ( parts[1].equals( "release" ) ) { + Taskmanager.releaseTask( parts[2] ); + return null; + } + // Anything else in parts[0] will be treated as a fresh task invocation, so let's + // pass it on to the task manager. + TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] ); + return serialize( parts[0], status ); + } + + private static byte[] serialize( String messageId, TaskStatus status ) + { + String data; + try { + synchronized ( sendGson ) { + data = sendGson.toJson( status ); + } + } catch ( Throwable e ) { + log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() ); + log.warn( e.toString() ); + return null; + } + return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 ); + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java b/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java new file mode 100644 index 0000000..1a02ff7 --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/util/ClassLoaderHack.java @@ -0,0 +1,66 @@ +package org.openslx.taskmanager.util; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; + +public class ClassLoaderHack +{ + @SuppressWarnings( "rawtypes" ) + private static final Class[] parameters = new Class[] { URL.class }; + + public static void addFile( String s ) throws IOException + { + File f = new File( s ); + addFile( f ); + } + + public static void addFile( File f ) throws IOException + { + addURL( f.toURI().toURL() ); + } + + public static void addURL( URL u ) throws IOException + { + URLClassLoader sysloader = (URLClassLoader)ClassLoader.getSystemClassLoader(); + Class sysclass = URLClassLoader.class; + + try { + Method method = sysclass.getDeclaredMethod( "addURL", parameters ); + method.setAccessible( true ); + method.invoke( sysloader, new Object[] { u } ); + System.out.println( "Loaded " + u.toString() ); + } catch ( Throwable t ) { + t.printStackTrace(); + throw new IOException( "Error, could not add URL to system classloader" ); + } + + } + + /** + * Get Class meta-object for given class in package. Only return class if it's somehow + * extending from given baseClass. + * + * @param packageName package to search in + * @param className name of class to look for + * @param baseClass class the class in question has to be extended from + * @return class meta object, or null if not found + */ + @SuppressWarnings( "unchecked" ) + public static Class getClass( String packageName, String className, Class baseClass ) + { + final Class clazz; + try { + clazz = Class.forName( packageName + '.' + className, true, ClassLoader.getSystemClassLoader() ); + } catch ( ClassNotFoundException e ) { + return null; + } + if ( clazz == null || ( baseClass != null && !baseClass.isAssignableFrom( clazz ) ) ) { + return null; + } + return (Class)clazz; + } + +} diff --git a/daemon/src/main/java/org/openslx/taskmanager/util/Util.java b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java new file mode 100644 index 0000000..bf52ecb --- /dev/null +++ b/daemon/src/main/java/org/openslx/taskmanager/util/Util.java @@ -0,0 +1,24 @@ +package org.openslx.taskmanager.util; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class Util +{ + + private static GsonBuilder gsonBuilder = new GsonBuilder(); + + /** + * Small helper to create a gson instance that will only handle class members with the + * "@Exposed" annotation. Decided against the default of explicitly excluding fields by + * making them transient, as you might easily forget to exclude an important field, which + * can in turn be a security issue. + * + * @return Gson instance + */ + public static Gson explicitGsonInstance() + { + return gsonBuilder.excludeFieldsWithoutExposeAnnotation().create(); + } + +} diff --git a/daemon/src/test/java/org/openslx/taskmanager/AppTest.java b/daemon/src/test/java/org/openslx/taskmanager/AppTest.java new file mode 100644 index 0000000..feac8e7 --- /dev/null +++ b/daemon/src/test/java/org/openslx/taskmanager/AppTest.java @@ -0,0 +1,38 @@ +package org.openslx.taskmanager; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} -- cgit v1.2.3-55-g7522