package org.openslx.taskmanager.network; import java.nio.charset.StandardCharsets; import java.util.zip.Deflater; import org.apache.log4j.Logger; import org.openslx.taskmanager.api.BoundedLog; import org.openslx.taskmanager.api.TaskStatus; import org.openslx.taskmanager.main.Taskmanager; import org.openslx.taskmanager.util.BoundedLogSerializer; import com.google.gson.Gson; import com.google.gson.GsonBuilder; public class RequestParser { private static final Logger log = Logger.getLogger( RequestParser.class ); /** * Our very own gson instance (for serializing replies) */ private final Gson sendGson = new GsonBuilder().registerTypeAdapter( BoundedLog.class, new BoundedLogSerializer() ).create(); /** * Same for the deflater */ private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true ); private final Taskmanager taskManager; public RequestParser( Taskmanager tm ) { this.taskManager = tm; } /** * Handle the given unparsed request. * * @param payload Packet data received from network, already converted to a string */ public byte[] handle( String payload ) { String[] parts = payload.split( ",", 3 ); // Message format is ", , " if ( parts.length != 3 ) { log.debug( "Could not split message" ); return null; } // Hacked in compression support: If parts[1] is prefixed by at // least 5 spaces, the sender supports deflated reply payload. boolean compress = parts[1].startsWith( " " ); parts[0] = parts[0].trim(); parts[1] = parts[1].trim(); parts[2] = parts[2].trim(); // Look at parts[1], if it's "status" it's a request for the task // with the ID given in parts[2] if ( parts[1].equals( "status" ) ) { TaskStatus status = taskManager.getTaskStatus( parts[2] ); return serialize( parts[0], status, compress ); } // Now check if parts[1] is "release" if ( parts[1].equals( "release" ) ) { taskManager.releaseTask( parts[2] ); return null; } // Cancel task if parts[1] is "cancel" if ( parts[1].equals( "cancel" ) ) { taskManager.cancelTask( parts[2] ); return null; } // Anything else in parts[0] will be treated as a fresh task invocation, so let's // pass it on to the task manager. TaskStatus status = taskManager.submitTask( parts[1], parts[2] ); return serialize( parts[0], status, compress ); } private static final byte[] COMMA = { ',' }; private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' }; private byte[] serialize( String messageId, TaskStatus status, boolean compress ) { byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 ); byte[] dataBytes; try { synchronized ( sendGson ) { dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 ); } if ( compress && dataBytes.length < 1400 ) { // Not worth the effort for small payloads compress = false; } if ( compress ) { byte[] compressed = new byte[ dataBytes.length + 16 ]; int compressedLen; synchronized ( deflater ) { deflater.setInput( dataBytes ); deflater.finish(); compressedLen = deflater.deflate( compressed ); deflater.reset(); } if ( compressedLen != 0 && compressedLen < dataBytes.length ) { log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" ); dataBytes = new byte[ compressedLen ]; System.arraycopy( compressed, 0, dataBytes, 0, compressedLen ); } else { // Failed or actually larger, disable compression compress = false; } } } catch ( Throwable e ) { log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e ); return null; } if ( compress ) { return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes ); } return concat( messageIdBytes, COMMA, dataBytes ); } private byte[] concat( byte[]... arrays ) { int len = 0; for ( int i = 0; i < arrays.length; ++i ) { len += arrays[i].length; } byte[] ret = new byte[ len ]; int pos = 0; for ( int i = 0; i < arrays.length; ++i ) { System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length ); pos += arrays[i].length; } return ret; } }