package org.openslx.imagemaster.localrpc; import; import; import; import; import; import; import java.nio.charset.StandardCharsets; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import org.openslx.bwlp.thrift.iface.ClientSessionData; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.imagemaster.session.Session; import org.openslx.imagemaster.session.SessionManager; import; /** * The network listener that will receive incoming UDP packets, try to process * them, and then send a reply. */ public class NetworkHandler implements Runnable { private static final Logger log = Logger.getLogger( NetworkHandler.class ); private Thread sendThread = null; /** * Sender instance (Runnable handling outgoing packets) */ private final Sender sender; /** * UDP socket for sending and receiving. */ private final DatagramSocket socket; /** * Gson class */ private final Gson gson = new Gson(); /** * Initialize the NetworkHandler by starting threads and opening the socket. */ public NetworkHandler( int port, InetAddress listenAddress ) throws SocketException { socket = new DatagramSocket( port, listenAddress ); sendThread = new Thread( sender = new Sender() ); } public void shutdown() { socket.close(); } /** * Prepare and enqueue reply for client request. * Only ever to be called from the receiving thread. The reply message is crafted * and then handed over to the sending thread. * * @param destination SocketAddress of the client * @param messageId The same ID the client used in it's request. * It's echoed back to the client to enable request bursts, and has no meaning for the * server. * @param status A TaskStatus instance to be serialized to json and sent to the client. */ private void send( SocketAddress destination, byte[] buffer ) { final DatagramPacket packet; try { packet = new DatagramPacket( buffer, buffer.length, destination ); } catch ( Exception e ) { log.warn( "Could not construct datagram packet for target " + destination.toString() ); e.printStackTrace(); return; } sender.send( packet ); } /** * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode */ @Override public void run() { byte readBuffer[] = new byte[ 66000 ]; try { sendThread.start(); while ( !Thread.interrupted() ) { DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); try { socket.receive( packet ); } catch ( IOException e ) { "IOException on UDP socket when reading: " + e.getMessage() ); Thread.sleep( 100 ); continue; } if ( packet.getLength() < 2 ) { log.debug( "Message too short" ); continue; } String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); try { String reply = handle( payload ); if ( reply != null ) send( packet.getSocketAddress(), reply.getBytes( StandardCharsets.UTF_8 ) ); } catch ( Throwable t ) { log.error( "Exception in RequestParser: " + t.toString() ); log.error( "Payload was: " + payload ); t.printStackTrace(); } } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } finally { sendThread.interrupt(); "UDP receiver finished." ); } } private String handle( String payload ) { try { JsonUser ju = gson.fromJson( payload, JsonUser.class ); UserInfo u = ju.toUser(); if ( u == null ) { log.warn( "Invalid or inomplete RPC data (" + payload + ")" ); return "Invalid or incomplete RPC data"; } ClientSessionData sd = SessionManager.addSession( new Session( u ) ); return "TOKEN:" + sd.authToken + " SESSIONID:" + sd.sessionId; } catch ( Throwable t ) { log.error( "Exception on json decode", t ); } return "Json error"; } /** * Private sending thread. * Use blocking queue, wait for packet to be added to it, then try to send. */ private class Sender implements Runnable { /** * Queue to stuff outgoing packets into. */ private final BlockingQueue queue = new LinkedBlockingQueue<>( 128 ); /** * Wait until something is put into the queue, then send it. */ @Override public void run() { try { while ( !Thread.interrupted() ) { final DatagramPacket packet; packet = queue.take(); try { socket.send( packet ); } catch ( IOException e ) { log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() ); } } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } finally { "UDP sender finished." ); } } /** * Add something to the outgoing packet queue. * Called from the receiving thread. */ public void send( DatagramPacket packet ) { if ( queue.offer( packet ) ) return; log.warn( "Could not add packet to queue: Full" ); } } }