summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2015-09-28 18:01:35 +0200
committerSimon Rettberg2015-09-28 18:01:35 +0200
commit4c974ca21d544634097b4d287c0f98bfacdefc55 (patch)
treed31f33bcc77919e8f6a99661f2b62a873390f457
parentSwitch to semaphore for signalling the mainloop to check for work (diff)
downloadtaskman-lite-4c974ca21d544634097b4d287c0f98bfacdefc55.tar.gz
taskman-lite-4c974ca21d544634097b4d287c0f98bfacdefc55.tar.xz
taskman-lite-4c974ca21d544634097b4d287c0f98bfacdefc55.zip
Handle some uncaught runtime errors, fix race condition
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java18
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java85
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java5
3 files changed, 88 insertions, 20 deletions
diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
index 271a8bd..b850c2c 100644
--- a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
+++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
@@ -67,6 +67,11 @@ public abstract class AbstractTask implements Runnable
* work when we're done.
*/
private volatile FinishCallback finishCallback = null;
+
+ /**
+ * Set to true as soon as we try to start this task using the thread pool executor.
+ */
+ private volatile boolean triedToStart = false;
/**
* Default constructor which should not be overridden
@@ -252,13 +257,22 @@ public abstract class AbstractTask implements Runnable
*/
public final boolean canStart()
{
- if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
+ if ( this.triedToStart || !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
return false;
}
if ( this.parent == null )
return true;
return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING;
}
+
+ /**
+ * Mark this task as being started to prevent a race condition where
+ * the task would be submitted to the thread pool more than once.
+ */
+ public final void markAsStarting()
+ {
+ this.triedToStart = true;
+ }
/**
* Checks whether this task can be removed from the task manager.
@@ -291,7 +305,7 @@ public abstract class AbstractTask implements Runnable
try {
ret = execute();
} catch ( Throwable t ) {
- LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() );
+ LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception", t );
ret = false;
}
if ( ret ) {
diff --git a/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java
index ed47336..d5a3b64 100644
--- a/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java
+++ b/api/src/main/java/org/openslx/taskmanager/api/SystemCommandTask.java
@@ -17,9 +17,22 @@ public abstract class SystemCommandTask extends AbstractTask
private Process process = null;
+ protected int timeoutSeconds = 0;
+
@Override
protected final boolean execute()
{
+ try {
+ return execInternal();
+ } catch ( Exception e ) {
+ log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
+ processStdErrInternal( e.toString() );
+ return processEnded( -3 );
+ }
+ }
+
+ private final boolean execInternal()
+ {
command = initCommandLine();
if ( command == null || command.length == 0 ) {
return processEnded( -1 );
@@ -31,7 +44,13 @@ public abstract class SystemCommandTask extends AbstractTask
try {
// Create process
- process = pb.start();
+ try {
+ process = pb.start();
+ } catch ( Exception e ) {
+ log.warn( "Process of task " + getId() + " died." );
+ processStdErrInternal( e.toString() );
+ return processEnded( -2 );
+ }
final Process p = process;
processStarted();
p.getOutputStream();
@@ -46,10 +65,11 @@ public abstract class SystemCommandTask extends AbstractTask
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
- processStdOut( line );
+ processStdOutInternal( line );
}
}
} catch ( Exception e ) {
+ e.printStackTrace();
}
}
} );
@@ -63,10 +83,11 @@ public abstract class SystemCommandTask extends AbstractTask
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
- processStdErr( line );
+ processStdErrInternal( line );
}
}
} catch ( Exception e ) {
+ e.printStackTrace();
}
}
} );
@@ -75,7 +96,30 @@ public abstract class SystemCommandTask extends AbstractTask
stderr.start();
// Wait for everything
- process.waitFor();
+ int retval = -1;
+ if ( this.timeoutSeconds <= 0 ) {
+ retval = process.waitFor();
+ } else {
+ int togo = timeoutSeconds * 10;
+ while ( togo-- > 0 ) {
+ try {
+ retval = process.exitValue();
+ break;
+ } catch ( IllegalThreadStateException e1 ) {
+ // Still running....
+ try {
+ Thread.sleep( 100 );
+ } catch ( Exception e2 ) {
+ // Bummer....
+ }
+ }
+ }
+ }
+ try {
+ stdout.join( 500 );
+ stderr.join( 500 );
+ } catch ( Throwable t ) {
+ }
try {
process.getErrorStream().close();
} catch ( Throwable t ) {
@@ -84,22 +128,15 @@ public abstract class SystemCommandTask extends AbstractTask
process.getOutputStream().close();
} catch ( Throwable t ) {
}
- stdout.join( 2000 );
- stderr.join( 2000 );
- return processEnded( process.exitValue() );
+ synchronized ( p ) {
+ return processEnded( retval );
+ }
- } catch ( IOException e ) {
- log.warn( "Process of task " + getId() + " died." );
- processStdErr( e.toString() );
- return processEnded( -2 );
} catch ( InterruptedException e ) {
+ processEnded( -4 );
Thread.currentThread().interrupt();
return false;
- } catch ( Exception e ) {
- log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
- processStdErr( e.toString() );
- return processEnded( -3 );
} finally {
if ( process != null )
process.destroy();
@@ -134,6 +171,24 @@ public abstract class SystemCommandTask extends AbstractTask
return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) );
}
+ private final void processStdOutInternal( String line )
+ {
+ try {
+ processStdOut( line );
+ } catch ( Throwable t ) {
+ log.warn( "processStdOut failed", t );
+ }
+ }
+
+ private final void processStdErrInternal( String line )
+ {
+ try {
+ processStdErr( line );
+ } catch ( Throwable t ) {
+ log.warn( "processStdErr failed", t );
+ }
+ }
+
/**
* Called to get the command line. Each argument should be a separate array
* element. Returning null means the task should not run (as the arguments
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
index 1325dca..69c190a 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
@@ -38,9 +38,7 @@ public class Taskmanager implements FinishCallback, Runnable
private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
/**
- * All the running/finished task instances. The mainloop will call wait() on this and this object
- * is notified as soon as the mainloop should check if there is any task available that can be
- * run.
+ * All the running/finished task instances.
*/
private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
@@ -166,6 +164,7 @@ public class Taskmanager implements FinishCallback, Runnable
if ( task.canStart() ) {
log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
threadPool.execute( task );
+ task.markAsStarting();
}
}
} catch ( RejectedExecutionException e ) {