From 617fb197e7e5a7be033fb5114bc72669f440da27 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 1 Dec 2016 15:13:57 +0100 Subject: Add cancellable tasks, optional response payload compression --- .../org/openslx/taskmanager/api/AbstractTask.java | 2 + .../openslx/taskmanager/api/CancellableTask.java | 15 +++++ .../org/openslx/taskmanager/api/TaskStatus.java | 10 ++- .../org/openslx/taskmanager/main/Taskmanager.java | 12 ++++ .../openslx/taskmanager/network/RequestParser.java | 78 +++++++++++++++++++--- 5 files changed, 107 insertions(+), 10 deletions(-) create mode 100644 api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java diff --git a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java index b850c2c..d284ecf 100644 --- a/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java +++ b/api/src/main/java/org/openslx/taskmanager/api/AbstractTask.java @@ -310,6 +310,8 @@ public abstract class AbstractTask implements Runnable } if ( ret ) { this.status.statusCode = StatusCode.TASK_FINISHED; + } else if ( this.status.statusCode == StatusCode.TASK_CANCELLING ) { + this.status.statusCode = StatusCode.TASK_CANCELLED; } else { this.status.statusCode = StatusCode.TASK_ERROR; } diff --git a/api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java b/api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java new file mode 100644 index 0000000..fa27091 --- /dev/null +++ b/api/src/main/java/org/openslx/taskmanager/api/CancellableTask.java @@ -0,0 +1,15 @@ +package org.openslx.taskmanager.api; + +/** + * Implement this if you want your task to be cancellable. + * After cancel has been called, your task should try to clean up and leave + * its execute() method in a timely fashion. + * The execute() method must return false in that case (unless + * it could still do everything it was supposed to) + */ +public interface CancellableTask +{ + + public void cancel(); + +} diff --git a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java index 0aa9ef7..edf7bce 100644 --- a/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java +++ b/api/src/main/java/org/openslx/taskmanager/api/TaskStatus.java @@ -15,12 +15,15 @@ public final class TaskStatus TASK_PROCESSING, TASK_FINISHED, TASK_ERROR, + TASK_CANCELLING, + TASK_CANCELLED, + NOT_CANCELLABLE, NO_SUCH_INSTANCE, NO_SUCH_TASK, NO_SUCH_CONSTRUCTOR, DUPLICATE_ID, PARENT_FAILED, - JSON_ERROR + JSON_ERROR, } /** @@ -77,6 +80,11 @@ public final class TaskStatus * Create a single "json error" status we can use everywhere. */ public static final TaskStatus ts_jsonError = new TaskStatus( StatusCode.JSON_ERROR ); + + /** + * Create a single "not cancellable" status we can use everywhere. + */ + public static final TaskStatus ts_notCancellable = new TaskStatus( StatusCode.NOT_CANCELLABLE ); /** * Create new TaskStatus with given initial status code diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java index 69c190a..b4df03d 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; import org.openslx.taskmanager.api.AbstractTask; +import org.openslx.taskmanager.api.CancellableTask; import org.openslx.taskmanager.api.FinishCallback; import org.openslx.taskmanager.api.TaskStatus; import org.openslx.taskmanager.util.ClassLoaderHack; @@ -131,6 +132,17 @@ public class Taskmanager implements FinishCallback, Runnable task.release(); } + public TaskStatus cancelTask( String taskId ) + { + final AbstractTask task = instances.get( taskId ); + if ( task == null ) + return TaskStatus.ts_noSuchInstance; + if ( !(task instanceof CancellableTask) ) + return TaskStatus.ts_notCancellable; + ( (CancellableTask)task ).cancel(); + return task.getStatus(); + } + /** * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances * is waiting for execution. diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java index 5741e58..2d46c5d 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java +++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java @@ -1,6 +1,7 @@ package org.openslx.taskmanager.network; import java.nio.charset.StandardCharsets; +import java.util.zip.Deflater; import org.apache.log4j.Logger; import org.openslx.taskmanager.api.TaskStatus; @@ -17,6 +18,11 @@ public class RequestParser */ private final Gson sendGson = new Gson(); + /** + * Same for the deflater + */ + private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true ); + private final Taskmanager taskManager; public RequestParser( Taskmanager tm ) @@ -31,42 +37,96 @@ public class RequestParser */ public byte[] handle( String payload ) { - String[] parts = payload.split( " *, *", 3 ); + String[] parts = payload.split( ",", 3 ); // Message format is ", , " if ( parts.length != 3 ) { log.debug( "Could not split message" ); return null; } + // Hacked in compression support: If parts[1] is prefixed by at + // least 5 spaces, the sender supports deflated reply payload. + boolean compress = parts[1].startsWith( " " ); + parts[0] = parts[0].trim(); + parts[1] = parts[1].trim(); + parts[2] = parts[2].trim(); // Look at parts[1], if it's "status" it's a request for the task // with the ID given in parts[2] if ( parts[1].equals( "status" ) ) { TaskStatus status = taskManager.getTaskStatus( parts[2] ); - return serialize( parts[0], status ); + return serialize( parts[0], status, compress ); } // Now check if parts[1] is "release" if ( parts[1].equals( "release" ) ) { taskManager.releaseTask( parts[2] ); return null; } + // Cancel task if parts[1] is "cancel" + if ( parts[1].equals( "cancel" ) ) { + taskManager.cancelTask( parts[2] ); + return null; + } // Anything else in parts[0] will be treated as a fresh task invocation, so let's // pass it on to the task manager. TaskStatus status = taskManager.submitTask( parts[1], parts[2] ); - return serialize( parts[0], status ); + return serialize( parts[0], status, compress ); } - private byte[] serialize( String messageId, TaskStatus status ) + private static final byte[] COMMA = { ',' }; + private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' }; + + private byte[] serialize( String messageId, TaskStatus status, boolean compress ) { - String data; + byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 ); + byte[] dataBytes; try { synchronized ( sendGson ) { - data = sendGson.toJson( status ); + dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 ); + } + if ( compress && dataBytes.length < 1400 ) { + // Not worth the effort for small payloads + compress = false; + } + if ( compress ) { + byte[] compressed = new byte[ dataBytes.length + 16 ]; + int compressedLen; + synchronized ( deflater ) { + deflater.setInput( dataBytes ); + deflater.finish(); + compressedLen = deflater.deflate( compressed ); + deflater.reset(); + } + if ( compressedLen != 0 && compressedLen < dataBytes.length ) { + log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" ); + dataBytes = new byte[ compressedLen ]; + System.arraycopy( compressed, 0, dataBytes, 0, compressedLen ); + } else { + // Failed or actually larger, disable compression + compress = false; + } } } catch ( Throwable e ) { - log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() ); - log.warn( e.toString() ); + log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e ); return null; } - return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 ); + if ( compress ) { + return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes ); + } + return concat( messageIdBytes, COMMA, dataBytes ); + } + + private byte[] concat( byte[]... arrays ) + { + int len = 0; + for ( int i = 0; i < arrays.length; ++i ) { + len += arrays[i].length; + } + byte[] ret = new byte[ len ]; + int pos = 0; + for ( int i = 0; i < arrays.length; ++i ) { + System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length ); + pos += arrays[i].length; + } + return ret; } } -- cgit v1.2.3-55-g7522