diff options
author | Simon Rettberg | 2016-12-01 15:13:57 +0100 |
---|---|---|
committer | Simon Rettberg | 2016-12-01 15:13:57 +0100 |
commit | 617fb197e7e5a7be033fb5114bc72669f440da27 (patch) | |
tree | 843ce0270c86e393de3873131e574bf069d818f8 /daemon | |
parent | DatagramPacket constructor doesn't throw SocketException in SE 8+ (diff) | |
download | taskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.tar.gz taskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.tar.xz taskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.zip |
Add cancellable tasks, optional response payload compression
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java | 12 | ||||
-rw-r--r-- | daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java | 78 |
2 files changed, 81 insertions, 9 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java index 69c190a..b4df03d 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java +++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.openslx.taskmanager.Global; import org.openslx.taskmanager.api.AbstractTask; +import org.openslx.taskmanager.api.CancellableTask; import org.openslx.taskmanager.api.FinishCallback; import org.openslx.taskmanager.api.TaskStatus; import org.openslx.taskmanager.util.ClassLoaderHack; @@ -131,6 +132,17 @@ public class Taskmanager implements FinishCallback, Runnable task.release(); } + public TaskStatus cancelTask( String taskId ) + { + final AbstractTask task = instances.get( taskId ); + if ( task == null ) + return TaskStatus.ts_noSuchInstance; + if ( !(task instanceof CancellableTask) ) + return TaskStatus.ts_notCancellable; + ( (CancellableTask)task ).cancel(); + return task.getStatus(); + } + /** * Wakes up the Taskmanager's mainloop so it will check if any of the current task instances * is waiting for execution. diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java index 5741e58..2d46c5d 100644 --- a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java +++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java @@ -1,6 +1,7 @@ package org.openslx.taskmanager.network; import java.nio.charset.StandardCharsets; +import java.util.zip.Deflater; import org.apache.log4j.Logger; import org.openslx.taskmanager.api.TaskStatus; @@ -17,6 +18,11 @@ public class RequestParser */ private final Gson sendGson = new Gson(); + /** + * Same for the deflater + */ + private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true ); + private final Taskmanager taskManager; public RequestParser( Taskmanager tm ) @@ -31,42 +37,96 @@ public class RequestParser */ public byte[] handle( String payload ) { - String[] parts = payload.split( " *, *", 3 ); + String[] parts = payload.split( ",", 3 ); // Message format is "<message id>, <command>, <command payload/argument>" if ( parts.length != 3 ) { log.debug( "Could not split message" ); return null; } + // Hacked in compression support: If parts[1] is prefixed by at + // least 5 spaces, the sender supports deflated reply payload. + boolean compress = parts[1].startsWith( " " ); + parts[0] = parts[0].trim(); + parts[1] = parts[1].trim(); + parts[2] = parts[2].trim(); // Look at parts[1], if it's "status" it's a request for the task // with the ID given in parts[2] if ( parts[1].equals( "status" ) ) { TaskStatus status = taskManager.getTaskStatus( parts[2] ); - return serialize( parts[0], status ); + return serialize( parts[0], status, compress ); } // Now check if parts[1] is "release" if ( parts[1].equals( "release" ) ) { taskManager.releaseTask( parts[2] ); return null; } + // Cancel task if parts[1] is "cancel" + if ( parts[1].equals( "cancel" ) ) { + taskManager.cancelTask( parts[2] ); + return null; + } // Anything else in parts[0] will be treated as a fresh task invocation, so let's // pass it on to the task manager. TaskStatus status = taskManager.submitTask( parts[1], parts[2] ); - return serialize( parts[0], status ); + return serialize( parts[0], status, compress ); } - private byte[] serialize( String messageId, TaskStatus status ) + private static final byte[] COMMA = { ',' }; + private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' }; + + private byte[] serialize( String messageId, TaskStatus status, boolean compress ) { - String data; + byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 ); + byte[] dataBytes; try { synchronized ( sendGson ) { - data = sendGson.toJson( status ); + dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 ); + } + if ( compress && dataBytes.length < 1400 ) { + // Not worth the effort for small payloads + compress = false; + } + if ( compress ) { + byte[] compressed = new byte[ dataBytes.length + 16 ]; + int compressedLen; + synchronized ( deflater ) { + deflater.setInput( dataBytes ); + deflater.finish(); + compressedLen = deflater.deflate( compressed ); + deflater.reset(); + } + if ( compressedLen != 0 && compressedLen < dataBytes.length ) { + log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" ); + dataBytes = new byte[ compressedLen ]; + System.arraycopy( compressed, 0, dataBytes, 0, compressedLen ); + } else { + // Failed or actually larger, disable compression + compress = false; + } } } catch ( Throwable e ) { - log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() ); - log.warn( e.toString() ); + log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e ); return null; } - return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 ); + if ( compress ) { + return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes ); + } + return concat( messageIdBytes, COMMA, dataBytes ); + } + + private byte[] concat( byte[]... arrays ) + { + int len = 0; + for ( int i = 0; i < arrays.length; ++i ) { + len += arrays[i].length; + } + byte[] ret = new byte[ len ]; + int pos = 0; + for ( int i = 0; i < arrays.length; ++i ) { + System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length ); + pos += arrays[i].length; + } + return ret; } } |