summaryrefslogtreecommitdiffstats
path: root/daemon/src/main/java/org/openslx/taskmanager/network/RequestParser.java
blob: e1f883e6dc9a9e90b4bf75d7aa6ab880c69de94b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package org.openslx.taskmanager.network;

import java.nio.charset.StandardCharsets;
import java.util.zip.Deflater;

import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.BoundedLog;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.main.Taskmanager;
import org.openslx.taskmanager.util.BoundedLogSerializer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class RequestParser
{
	private static final Logger log = Logger.getLogger( RequestParser.class );

	/**
	 * Our very own gson instance (for serializing replies)
	 */
	private final Gson sendGson = new GsonBuilder().registerTypeAdapter( BoundedLog.class, new BoundedLogSerializer() ).create();

	/**
	 * Same for the deflater
	 */
	private final Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true );

	private final Taskmanager taskManager;

	public RequestParser( Taskmanager tm )
	{
		this.taskManager = tm;
	}

	/**
	 * Handle the given unparsed request.
	 * 
	 * @param payload Packet data received from network, already converted to a string
	 */
	public byte[] handle( String payload )
	{
		String[] parts = payload.split( ",", 3 );
		// Message format is "<message id>, <command>, <command payload/argument>"
		if ( parts.length != 3 ) {
			log.debug( "Could not split message" );
			return null;
		}
		// Hacked in compression support: If parts[1] is prefixed by at
		// least 5 spaces, the sender supports deflated reply payload.
		boolean compress = parts[1].startsWith( "     " );
		parts[0] = parts[0].trim();
		parts[1] = parts[1].trim();
		parts[2] = parts[2].trim();
		// Look at parts[1], if it's "status" it's a request for the task
		// with the ID given in parts[2]
		if ( parts[1].equals( "status" ) ) {
			TaskStatus status = taskManager.getTaskStatus( parts[2] );
			return serialize( parts[0], status, compress );
		}
		// Now check if parts[1] is "release"
		if ( parts[1].equals( "release" ) ) {
			taskManager.releaseTask( parts[2] );
			return null;
		}
		// Cancel task if parts[1] is "cancel"
		if ( parts[1].equals( "cancel" ) ) {
			taskManager.cancelTask( parts[2] );
			return null;
		}
		// Anything else in parts[0] will be treated as a fresh task invocation, so let's
		// pass it on to the task manager.
		TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
		return serialize( parts[0], status, compress );
	}

	private static final byte[] COMMA = { ',' };
	private static final byte[] COMPRESS_PREFIX = { '+', 'z', ':' };

	private byte[] serialize( String messageId, TaskStatus status, boolean compress )
	{
		byte[] messageIdBytes = messageId.getBytes( StandardCharsets.UTF_8 );
		byte[] dataBytes;
		try {
			synchronized ( sendGson ) {
				dataBytes = sendGson.toJson( status ).getBytes( StandardCharsets.UTF_8 );
			}
			if ( compress && dataBytes.length < 1400 ) {
				// Not worth the effort for small payloads
				compress = false;
			}
			if ( compress ) {
				byte[] compressed = new byte[ dataBytes.length + 16 ];
				int compressedLen;
				synchronized ( deflater ) {
					deflater.setInput( dataBytes );
					deflater.finish();
					compressedLen = deflater.deflate( compressed );
					deflater.reset();
				}
				if ( compressedLen != 0 && compressedLen < dataBytes.length ) {
					log.debug( "Sending compressed reply (" + dataBytes.length + " -> " + compressedLen + ")" );
					dataBytes = new byte[ compressedLen ];
					System.arraycopy( compressed, 0, dataBytes, 0, compressedLen );
				} else {
					// Failed or actually larger, disable compression
					compress = false;
				}
			}
		} catch ( Throwable e ) {
			log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName(), e );
			return null;
		}
		if ( compress ) {
			return concat( COMPRESS_PREFIX, messageIdBytes, COMMA, dataBytes );
		}
		return concat( messageIdBytes, COMMA, dataBytes );
	}

	private byte[] concat( byte[]... arrays )
	{
		int len = 0;
		for ( int i = 0; i < arrays.length; ++i ) {
			len += arrays[i].length;
		}
		byte[] ret = new byte[ len ];
		int pos = 0;
		for ( int i = 0; i < arrays.length; ++i ) {
			System.arraycopy( arrays[i], 0, ret, pos, arrays[i].length );
			pos += arrays[i].length;
		}
		return ret;
	}

}