diff options
3 files changed, 88 insertions, 20 deletions
diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java index 271a8bd..b850c2c 100644 --- a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java +++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java @@ -67,6 +67,11 @@ public abstract class AbstractTask implements Runnable * work when we're done. */ private volatile FinishCallback finishCallback = null; + + /** + * Set to true as soon as we try to start this task using the thread pool executor. + */ + private volatile boolean triedToStart = false; /** * Default constructor which should not be overridden @@ -252,13 +257,22 @@ public abstract class AbstractTask implements Runnable */ public final boolean canStart() { - if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) { + if ( this.triedToStart || !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) { return false; } if ( this.parent == null ) return true; return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING; } + + /** + * Mark this task as being started to prevent a race condition where + * the task would be submitted to the thread pool more than once. + */ + public final void markAsStarting() + { + this.triedToStart = true; + } /** * Checks whether this task can be removed from the task manager. @@ -291,7 +305,7 @@ public abstract class AbstractTask implements Runnable try { ret = execute(); } catch ( Throwable t ) { - LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() ); + LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception", t ); ret = false; } if ( ret ) { diff --git a/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java index ed47336..d5a3b64 100644 --- a/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java +++ b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java @@ -17,9 +17,22 @@ public abstract class SystemCommandTask extends AbstractTask private Process process = null; + protected int timeoutSeconds = 0; + @Override protected final boolean execute() { + try { + return execInternal(); + } catch ( Exception e ) { + log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() ); + processStdErrInternal( e.toString() ); + return processEnded( -3 ); + } + } + + private final boolean execInternal() + { command = initCommandLine(); if ( command == null || command.length == 0 ) { return processEnded( -1 ); @@ -31,7 +44,13 @@ public abstract class SystemCommandTask extends AbstractTask try { // Create process - process = pb.start(); + try { + process = pb.start(); + } catch ( Exception e ) { + log.warn( "Process of task " + getId() + " died." ); + processStdErrInternal( e.toString() ); + return processEnded( -2 ); + } final Process p = process; processStarted(); p.getOutputStream(); @@ -46,10 +65,11 @@ public abstract class SystemCommandTask extends AbstractTask String line; while ( ( line = reader.readLine() ) != null ) { synchronized ( p ) { - processStdOut( line ); + processStdOutInternal( line ); } } } catch ( Exception e ) { + e.printStackTrace(); } } } ); @@ -63,10 +83,11 @@ public abstract class SystemCommandTask extends AbstractTask String line; while ( ( line = reader.readLine() ) != null ) { synchronized ( p ) { - processStdErr( line ); + processStdErrInternal( line ); } } } catch ( Exception e ) { + e.printStackTrace(); } } } ); @@ -75,7 +96,30 @@ public abstract class SystemCommandTask extends AbstractTask stderr.start(); // Wait for everything - process.waitFor(); + int retval = -1; + if ( this.timeoutSeconds <= 0 ) { + retval = process.waitFor(); + } else { + int togo = timeoutSeconds * 10; + while ( togo-- > 0 ) { + try { + retval = process.exitValue(); + break; + } catch ( IllegalThreadStateException e1 ) { + // Still running.... + try { + Thread.sleep( 100 ); + } catch ( Exception e2 ) { + // Bummer.... + } + } + } + } + try { + stdout.join( 500 ); + stderr.join( 500 ); + } catch ( Throwable t ) { + } try { process.getErrorStream().close(); } catch ( Throwable t ) { @@ -84,22 +128,15 @@ public abstract class SystemCommandTask extends AbstractTask process.getOutputStream().close(); } catch ( Throwable t ) { } - stdout.join( 2000 ); - stderr.join( 2000 ); - return processEnded( process.exitValue() ); + synchronized ( p ) { + return processEnded( retval ); + } - } catch ( IOException e ) { - log.warn( "Process of task " + getId() + " died." ); - processStdErr( e.toString() ); - return processEnded( -2 ); } catch ( InterruptedException e ) { + processEnded( -4 ); Thread.currentThread().interrupt(); return false; - } catch ( Exception e ) { - log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() ); - processStdErr( e.toString() ); - return processEnded( -3 ); } finally { if ( process != null ) process.destroy(); @@ -134,6 +171,24 @@ public abstract class SystemCommandTask extends AbstractTask return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) ); } + private final void processStdOutInternal( String line ) + { + try { + processStdOut( line ); + } catch ( Throwable t ) { + log.warn( "processStdOut failed", t ); + } + } + + private final void processStdErrInternal( String line ) + { + try { + processStdErr( line ); + } catch ( Throwable t ) { + log.warn( "processStdErr failed", t ); + } + } + /** * Called to get the command line. Each argument should be a separate array * element. Returning null means the task should not run (as the arguments diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java index 1325dca..69c190a 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -38,9 +38,7 @@ public class Taskmanager implements FinishCallback, Runnable private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>(); /** - * All the running/finished task instances. The mainloop will call wait() on this and this object - * is notified as soon as the mainloop should check if there is any task available that can be - * run. + * All the running/finished task instances. */ private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>(); @@ -166,6 +164,7 @@ public class Taskmanager implements FinishCallback, Runnable if ( task.canStart() ) { log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" ); threadPool.execute( task ); + task.markAsStarting(); } } } catch ( RejectedExecutionException e ) { |