package org.openslx.taskmanager.network;
import java.nio.charset.StandardCharsets;
import java.util.zip.Deflater;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.BoundedLog;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.main.Taskmanager;
import org.openslx.taskmanager.util.BoundedLogSerializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class RequestParser
{
private static final Logger log = Logger.getLogger( RequestParser.class );
/**
* Our very own gson instance (for serializing replies)
*/
private final Gson sendGson = new GsonBuilder().registerTypeAdapter( BoundedLog.class, new BoundedLogSerializer() ).create();
/**
* Same for the deflater
*/
private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true );
private final Taskmanager taskManager;
public RequestParser( Taskmanager tm )
{
this.taskManager = tm;
}
/**
* Handle the given unparsed request.
*
* @param payload Packet data received from network, already converted to a string
*/
public byte[] handle( String payload )
{
String[] parts = payload.split( ",", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
if ( parts.length != 3 ) {
log.debug( "Could not split message" );
return null;
}
// Hacked in compression support: If parts[1] is prefixed by at
// least 5 spaces, the sender supports deflated reply payload.
boolean compress = parts[1].startsWith( " " );
parts[0] = parts[0].trim();
parts[1] = parts[1].trim();
parts[2] = parts[2].trim();
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
TaskStatus status = taskManager.getTaskStatus( parts[2] );
return serialize( parts[0], status, compress );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
taskManager.releaseTask( parts[2] );
return null;
}
// Cancel task if parts[1] is "cancel"
if ( parts[1].equals( "cancel" ) ) {
taskManager.cancelTask( parts[2] );
return null;
}
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
return serialize( parts[0], status, compress );
}
private static final byte[] COMMA = { ',' };
private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' };
private byte[] serialize( String messageId, TaskStatus status, boolean compress )
{
byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 );
byte[] dataBytes;
try {
synchronized ( sendGson ) {
dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 );
}
if ( compress && dataBytes.length < 1400 ) {
// Not worth the effort for small payloads
compress = false;
}
if ( compress ) {
byte[] compressed = new byte[ dataBytes.length + 16 ];
int compressedLen;
synchronized ( deflater ) {
deflater.setInput( dataBytes );
deflater.finish();
compressedLen = deflater.deflate( compressed );
deflater.reset();
}
if ( compressedLen != 0 && compressedLen < dataBytes.length ) {
log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" );
dataBytes = new byte[ compressedLen ];
System.arraycopy( compressed, 0, dataBytes, 0, compressedLen );
} else {
// Failed or actually larger, disable compression
compress = false;
}
}
} catch ( Throwable e ) {
log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e );
return null;
}
if ( compress ) {
return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes );
}
return concat( messageIdBytes, COMMA, dataBytes );
}
private byte[] concat( byte[]... arrays )
{
int len = 0;
for ( int i = 0; i < arrays.length; ++i ) {
len += arrays[i].length;
}
byte[] ret = new byte[ len ];
int pos = 0;
for ( int i = 0; i < arrays.length; ++i ) {
System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length );
pos += arrays[i].length;
}
return ret;
}
}