summaryrefslogtreecommitdiffstats
path: root/daemon
diff options
context:
space:
mode:
authorSimon Rettberg2016-12-01 15:13:57 +0100
committerSimon Rettberg2016-12-01 15:13:57 +0100
commit617fb197e7e5a7be033fb5114bc72669f440da27 (patch)
tree843ce0270c86e393de3873131e574bf069d818f8 /daemon
parentDatagramPacket constructor doesn't throw SocketException in SE 8+ (diff)
downloadtaskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.tar.gz
taskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.tar.xz
taskman-lite-617fb197e7e5a7be033fb5114bc72669f440da27.zip
Add cancellable tasks, optional response payload compression
Diffstat (limited to 'daemon')
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java12
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java78
2 files changed, 81 insertions, 9 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
index 69c190a..b4df03d 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/main/Taskmanager.java
@@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
+import org.openslx.taskmanager.api.CancellableTask;
import org.openslx.taskmanager.api.FinishCallback;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
@@ -131,6 +132,17 @@ public class Taskmanager implements FinishCallback, Runnable
task.release();
}
+ public TaskStatus cancelTask( String taskId )
+ {
+ final AbstractTask task = instances.get( taskId );
+ if ( task == null )
+ return TaskStatus.ts_noSuchInstance;
+ if ( !(task instanceof CancellableTask) )
+ return TaskStatus.ts_notCancellable;
+ ( (CancellableTask)task ).cancel();
+ return task.getStatus();
+ }
+
/**
* Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
* is waiting for execution.
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
index 5741e58..2d46c5d 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
@@ -1,6 +1,7 @@
package org.openslx.taskmanager.network;
import java.nio.charset.StandardCharsets;
+import java.util.zip.Deflater;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.TaskStatus;
@@ -17,6 +18,11 @@ public class RequestParser
*/
private final Gson sendGson = new Gson();
+ /**
+ * Same for the deflater
+ */
+ private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true );
+
private final Taskmanager taskManager;
public RequestParser( Taskmanager tm )
@@ -31,42 +37,96 @@ public class RequestParser
*/
public byte[] handle( String payload )
{
- String[] parts = payload.split( " *, *", 3 );
+ String[] parts = payload.split( ",", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
if ( parts.length != 3 ) {
log.debug( "Could not split message" );
return null;
}
+ // Hacked in compression support: If parts[1] is prefixed by at
+ // least 5 spaces, the sender supports deflated reply payload.
+ boolean compress = parts[1].startsWith( " " );
+ parts[0] = parts[0].trim();
+ parts[1] = parts[1].trim();
+ parts[2] = parts[2].trim();
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
TaskStatus status = taskManager.getTaskStatus( parts[2] );
- return serialize( parts[0], status );
+ return serialize( parts[0], status, compress );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
taskManager.releaseTask( parts[2] );
return null;
}
+ // Cancel task if parts[1] is "cancel"
+ if ( parts[1].equals( "cancel" ) ) {
+ taskManager.cancelTask( parts[2] );
+ return null;
+ }
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
- return serialize( parts[0], status );
+ return serialize( parts[0], status, compress );
}
- private byte[] serialize( String messageId, TaskStatus status )
+ private static final byte[] COMMA = { ',' };
+ private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' };
+
+ private byte[] serialize( String messageId, TaskStatus status, boolean compress )
{
- String data;
+ byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 );
+ byte[] dataBytes;
try {
synchronized ( sendGson ) {
- data = sendGson.toJson( status );
+ dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 );
+ }
+ if ( compress && dataBytes.length < 1400 ) {
+ // Not worth the effort for small payloads
+ compress = false;
+ }
+ if ( compress ) {
+ byte[] compressed = new byte[ dataBytes.length + 16 ];
+ int compressedLen;
+ synchronized ( deflater ) {
+ deflater.setInput( dataBytes );
+ deflater.finish();
+ compressedLen = deflater.deflate( compressed );
+ deflater.reset();
+ }
+ if ( compressedLen != 0 && compressedLen < dataBytes.length ) {
+ log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" );
+ dataBytes = new byte[ compressedLen ];
+ System.arraycopy( compressed, 0, dataBytes, 0, compressedLen );
+ } else {
+ // Failed or actually larger, disable compression
+ compress = false;
+ }
}
} catch ( Throwable e ) {
- log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() );
- log.warn( e.toString() );
+ log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e );
return null;
}
- return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 );
+ if ( compress ) {
+ return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes );
+ }
+ return concat( messageIdBytes, COMMA, dataBytes );
+ }
+
+ private byte[] concat( byte[]... arrays )
+ {
+ int len = 0;
+ for ( int i = 0; i < arrays.length; ++i ) {
+ len += arrays[i].length;
+ }
+ byte[] ret = new byte[ len ];
+ int pos = 0;
+ for ( int i = 0; i < arrays.length; ++i ) {
+ System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length );
+ pos += arrays[i].length;
+ }
+ return ret;
}
}