summaryrefslogtreecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java')
-rw-r--r--daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java78
1 files changed, 69 insertions, 9 deletions
diff --git a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
index 5741e58..2d46c5d 100644
--- a/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
+++ b/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
@@ -1,6 +1,7 @@
package org.openslx.taskmanager.network;
import java.nio.charset.StandardCharsets;
+import java.util.zip.Deflater;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.TaskStatus;
@@ -17,6 +18,11 @@ public class RequestParser
*/
private final Gson sendGson = new Gson();
+ /**
+ * Same for the deflater
+ */
+ private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true );
+
private final Taskmanager taskManager;
public RequestParser( Taskmanager tm )
@@ -31,42 +37,96 @@ public class RequestParser
*/
public byte[] handle( String payload )
{
- String[] parts = payload.split( " *, *", 3 );
+ String[] parts = payload.split( ",", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
if ( parts.length != 3 ) {
log.debug( "Could not split message" );
return null;
}
+ // Hacked in compression support: If parts[1] is prefixed by at
+ // least 5 spaces, the sender supports deflated reply payload.
+ boolean compress = parts[1].startsWith( " " );
+ parts[0] = parts[0].trim();
+ parts[1] = parts[1].trim();
+ parts[2] = parts[2].trim();
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
TaskStatus status = taskManager.getTaskStatus( parts[2] );
- return serialize( parts[0], status );
+ return serialize( parts[0], status, compress );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
taskManager.releaseTask( parts[2] );
return null;
}
+ // Cancel task if parts[1] is "cancel"
+ if ( parts[1].equals( "cancel" ) ) {
+ taskManager.cancelTask( parts[2] );
+ return null;
+ }
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
- return serialize( parts[0], status );
+ return serialize( parts[0], status, compress );
}
- private byte[] serialize( String messageId, TaskStatus status )
+ private static final byte[] COMMA = { ',' };
+ private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' };
+
+ private byte[] serialize( String messageId, TaskStatus status, boolean compress )
{
- String data;
+ byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 );
+ byte[] dataBytes;
try {
synchronized ( sendGson ) {
- data = sendGson.toJson( status );
+ dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 );
+ }
+ if ( compress && dataBytes.length < 1400 ) {
+ // Not worth the effort for small payloads
+ compress = false;
+ }
+ if ( compress ) {
+ byte[] compressed = new byte[ dataBytes.length + 16 ];
+ int compressedLen;
+ synchronized ( deflater ) {
+ deflater.setInput( dataBytes );
+ deflater.finish();
+ compressedLen = deflater.deflate( compressed );
+ deflater.reset();
+ }
+ if ( compressedLen != 0 && compressedLen < dataBytes.length ) {
+ log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" );
+ dataBytes = new byte[ compressedLen ];
+ System.arraycopy( compressed, 0, dataBytes, 0, compressedLen );
+ } else {
+ // Failed or actually larger, disable compression
+ compress = false;
+ }
}
} catch ( Throwable e ) {
- log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() );
- log.warn( e.toString() );
+ log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e );
return null;
}
- return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 );
+ if ( compress ) {
+ return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes );
+ }
+ return concat( messageIdBytes, COMMA, dataBytes );
+ }
+
+ private byte[] concat( byte[]... arrays )
+ {
+ int len = 0;
+ for ( int i = 0; i < arrays.length; ++i ) {
+ len += arrays[i].length;
+ }
+ byte[] ret = new byte[ len ];
+ int pos = 0;
+ for ( int i = 0; i < arrays.length; ++i ) {
+ System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length );
+ pos += arrays[i].length;
+ }
+ return ret;
}
}