summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2016-12-01 15:13:57 +0100
committerSimon Rettberg2016-12-01 15:13:57 +0100
commit617fb197e7e5a7be033fb5114bc72669f440da27 (patch)
tree843ce0270c86e393de3873131e574bf069d818f8
parentDatagramPacket constructor doesn't throw SocketException in SE 8+ (diff)
downloadtaskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.tar.gz
taskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.tar.xz
taskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.zip
Add cancellable tasks, optional response payload compression
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java2
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java15
-rw-r--r--api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java10
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java12
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java78
5 files changed, 107 insertions, 10 deletions
diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
index b850c2c..d284ecf 100644
--- a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
+++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java
@@ -310,6 +310,8 @@ public abstract class AbstractTask implements Runnable
}
if ( ret ) {
this.status.statusCode = StatusCode.TASK_FINISHED;
+ } else if ( this.status.statusCode == StatusCode.TASK_CANCELLING ) {
+ this.status.statusCode = StatusCode.TASK_CANCELLED;
} else {
this.status.statusCode = StatusCode.TASK_ERROR;
}
diff --git a/api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java b/api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java
new file mode 100644
index 0000000..fa27091
--- /dev/null
+++ b/api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java
@@ -0,0 +1,15 @@
+package org.openslx.taskmanager.api;
+
+/**
+ * Implement this if you want your task to be cancellable.
+ * After cancel has been called, your task should try to clean up and leave
+ * its execute() method in a timely fashion.
+ * The execute() method must return false in that case (unless
+ * it could still do everything it was supposed to)
+ */
+public interface CancellableTask
+{
+
+ public void cancel();
+
+}
diff --git a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
index 0aa9ef7..edf7bce 100644
--- a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
+++ b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java
@@ -15,12 +15,15 @@ public final class TaskStatus
TASK_PROCESSING,
TASK_FINISHED,
TASK_ERROR,
+ TASK_CANCELLING,
+ TASK_CANCELLED,
+ NOT_CANCELLABLE,
NO_SUCH_INSTANCE,
NO_SUCH_TASK,
NO_SUCH_CONSTRUCTOR,
DUPLICATE_ID,
PARENT_FAILED,
- JSON_ERROR
+ JSON_ERROR,
}
/**
@@ -77,6 +80,11 @@ public final class TaskStatus
* Create a single "json error" status we can use everywhere.
*/
public static final TaskStatus ts_jsonError = new TaskStatus( StatusCode.JSON_ERROR );
+
+ /**
+ * Create a single "not cancellable" status we can use everywhere.
+ */
+ public static final TaskStatus ts_notCancellable = new TaskStatus( StatusCode.NOT_CANCELLABLE );
/**
* Create new TaskStatus with given initial status code
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
index 69c190a..b4df03d 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
@@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
+import org.openslx.taskmanager.api.CancellableTask;
import org.openslx.taskmanager.api.FinishCallback;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
@@ -131,6 +132,17 @@ public class Taskmanager implements FinishCallback, Runnable
task.release();
}
+ public TaskStatus cancelTask( String taskId )
+ {
+ final AbstractTask task = instances.get( taskId );
+ if ( task == null )
+ return TaskStatus.ts_noSuchInstance;
+ if ( !(task instanceof CancellableTask) )
+ return TaskStatus.ts_notCancellable;
+ ( (CancellableTask)task ).cancel();
+ return task.getStatus();
+ }
+
/**
* Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
* is waiting for execution.
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
index 5741e58..2d46c5d 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
@@ -1,6 +1,7 @@
package org.openslx.taskmanager.network;
import java.nio.charset.StandardCharsets;
+import java.util.zip.Deflater;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.TaskStatus;
@@ -17,6 +18,11 @@ public class RequestParser
*/
private final Gson sendGson = new Gson();
+ /**
+ * Same for the deflater
+ */
+ private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true );
+
private final Taskmanager taskManager;
public RequestParser( Taskmanager tm )
@@ -31,42 +37,96 @@ public class RequestParser
*/
public byte[] handle( String payload )
{
- String[] parts = payload.split( " *, *", 3 );
+ String[] parts = payload.split( ",", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
if ( parts.length != 3 ) {
log.debug( "Could not split message" );
return null;
}
+ // Hacked in compression support: If parts[1] is prefixed by at
+ // least 5 spaces, the sender supports deflated reply payload.
+ boolean compress = parts[1].startsWith( " " );
+ parts[0] = parts[0].trim();
+ parts[1] = parts[1].trim();
+ parts[2] = parts[2].trim();
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
TaskStatus status = taskManager.getTaskStatus( parts[2] );
- return serialize( parts[0], status );
+ return serialize( parts[0], status, compress );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
taskManager.releaseTask( parts[2] );
return null;
}
+ // Cancel task if parts[1] is "cancel"
+ if ( parts[1].equals( "cancel" ) ) {
+ taskManager.cancelTask( parts[2] );
+ return null;
+ }
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
- return serialize( parts[0], status );
+ return serialize( parts[0], status, compress );
}
- private byte[] serialize( String messageId, TaskStatus status )
+ private static final byte[] COMMA = { ',' };
+ private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' };
+
+ private byte[] serialize( String messageId, TaskStatus status, boolean compress )
{
- String data;
+ byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 );
+ byte[] dataBytes;
try {
synchronized ( sendGson ) {
- data = sendGson.toJson( status );
+ dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 );
+ }
+ if ( compress && dataBytes.length < 1400 ) {
+ // Not worth the effort for small payloads
+ compress = false;
+ }
+ if ( compress ) {
+ byte[] compressed = new byte[ dataBytes.length + 16 ];
+ int compressedLen;
+ synchronized ( deflater ) {
+ deflater.setInput( dataBytes );
+ deflater.finish();
+ compressedLen = deflater.deflate( compressed );
+ deflater.reset();
+ }
+ if ( compressedLen != 0 && compressedLen < dataBytes.length ) {
+ log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" );
+ dataBytes = new byte[ compressedLen ];
+ System.arraycopy( compressed, 0, dataBytes, 0, compressedLen );
+ } else {
+ // Failed or actually larger, disable compression
+ compress = false;
+ }
}
} catch ( Throwable e ) {
- log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() );
- log.warn( e.toString() );
+ log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e );
return null;
}
- return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 );
+ if ( compress ) {
+ return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes );
+ }
+ return concat( messageIdBytes, COMMA, dataBytes );
+ }
+
+ private byte[] concat( byte[]... arrays )
+ {
+ int len = 0;
+ for ( int i = 0; i < arrays.length; ++i ) {
+ len += arrays[i].length;
+ }
+ byte[] ret = new byte[ len ];
+ int pos = 0;
+ for ( int i = 0; i < arrays.length; ++i ) {
+ System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length );
+ pos += arrays[i].length;
+ }
+ return ret;
}
}