summaryrefslogblamecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
blob: e1f883e6dc9a9e90b4bf75d7aa6ab880c69de94b (plain) (tree)
1
2
3
4
5
6
7
8
9


                                         
                              

                               
                                              

                                                
                                                         

                            
                                   







                                                                                  
                                                                                                                                     
 




                                                                                          





                                              



                                             

                                                                                          
                                              
         
                                                         




                                                                                          





                                                                               


                                                                                 
                                                                                  
                                                                       


                                                     
                                                            

                                    




                                                           

                                                                                                 
                                                                                 
                                                               

         



                                                                                         
         

                                                                                     

                                                   






















                                                                                                                                    

                                         
                                                                                                                        

                                    


















                                                                                           


         
package org.openslx.taskmanager.network;

import java.nio.charset.StandardCharsets;
import java.util.zip.Deflater;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.BoundedLog;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.main.Taskmanager;
import org.openslx.taskmanager.util.BoundedLogSerializer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class RequestParser
{
	private static final Logger log = Logger.getLogger( RequestParser.class );

	/**
	 * Our very own gson instance (for serializing replies)
	 */
	private final Gson sendGson = new GsonBuilder().registerTypeAdapter( BoundedLog.class, new BoundedLogSerializer() ).create();

	/**
	 * Same for the deflater
	 */
	private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true );

	private final Taskmanager taskManager;

	public RequestParser( Taskmanager tm )
	{
		this.taskManager = tm;
	}

	/**
	 * Handle the given unparsed request.
	 * 
	 * @param payload Packet data received from network, already converted to a string
	 */
	public byte[] handle( String payload )
	{
		String[] parts = payload.split( ",", 3 );
		// Message format is "<message id>, <command>, <command payload/argument>"
		if ( parts.length != 3 ) {
			log.debug( "Could not split message" );
			return null;
		}
		// Hacked in compression support: If parts[1] is prefixed by at
		// least 5 spaces, the sender supports deflated reply payload.
		boolean compress = parts[1].startsWith( "     " );
		parts[0] = parts[0].trim();
		parts[1] = parts[1].trim();
		parts[2] = parts[2].trim();
		// Look at parts[1], if it's "status" it's a request for the task
		// with the ID given in parts[2]
		if ( parts[1].equals( "status" ) ) {
			TaskStatus status = taskManager.getTaskStatus( parts[2] );
			return serialize( parts[0], status, compress );
		}
		// Now check if parts[1] is "release"
		if ( parts[1].equals( "release" ) ) {
			taskManager.releaseTask( parts[2] );
			return null;
		}
		// Cancel task if parts[1] is "cancel"
		if ( parts[1].equals( "cancel" ) ) {
			taskManager.cancelTask( parts[2] );
			return null;
		}
		// Anything else in parts[0] will be treated as a fresh task invocation, so let's
		// pass it on to the task manager.
		TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
		return serialize( parts[0], status, compress );
	}

	private static final byte[] COMMA = { ',' };
	private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' };

	private byte[] serialize( String messageId, TaskStatus status, boolean compress )
	{
		byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 );
		byte[] dataBytes;
		try {
			synchronized ( sendGson ) {
				dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 );
			}
			if ( compress && dataBytes.length < 1400 ) {
				// Not worth the effort for small payloads
				compress = false;
			}
			if ( compress ) {
				byte[] compressed = new byte[ dataBytes.length + 16 ];
				int compressedLen;
				synchronized ( deflater ) {
					deflater.setInput( dataBytes );
					deflater.finish();
					compressedLen = deflater.deflate( compressed );
					deflater.reset();
				}
				if ( compressedLen != 0 && compressedLen < dataBytes.length ) {
					log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" );
					dataBytes = new byte[ compressedLen ];
					System.arraycopy( compressed, 0, dataBytes, 0, compressedLen );
				} else {
					// Failed or actually larger, disable compression
					compress = false;
				}
			}
		} catch ( Throwable e ) {
			log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e );
			return null;
		}
		if ( compress ) {
			return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes );
		}
		return concat( messageIdBytes, COMMA, dataBytes );
	}

	private byte[] concat( byte[]... arrays )
	{
		int len = 0;
		for ( int i = 0; i < arrays.length; ++i ) {
			len += arrays[i].length;
		}
		byte[] ret = new byte[ len ];
		int pos = 0;
		for ( int i = 0; i < arrays.length; ++i ) {
			System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length );
			pos += arrays[i].length;
		}
		return ret;
	}

}