diff options
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/localrpc/NetworkHandler.java')
-rw-r--r-- | src/main/java/org/openslx/imagemaster/localrpc/NetworkHandler.java | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/src/main/java/org/openslx/imagemaster/localrpc/NetworkHandler.java b/src/main/java/org/openslx/imagemaster/localrpc/NetworkHandler.java new file mode 100644 index 0000000..96b1212 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/localrpc/NetworkHandler.java @@ -0,0 +1,188 @@ +package org.openslx.imagemaster.localrpc; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.thrift.iface.ClientSessionData; +import org.openslx.imagemaster.session.Session; +import org.openslx.imagemaster.session.SessionManager; +import org.openslx.imagemaster.session.User; + +import com.google.gson.Gson; + +/** + * The network listener that will receive incoming UDP packets, try to process + * them, and then send a reply. + */ +public class NetworkHandler implements Runnable +{ + + private static final Logger log = Logger.getLogger( NetworkHandler.class ); + + private Thread sendThread = null; + /** + * Sender instance (Runnable handling outgoing packets) + */ + private final Sender sender; + /** + * UDP socket for sending and receiving. + */ + private final DatagramSocket socket; + /** + * Gson class + */ + private final Gson gson = new Gson(); + + /** + * Initialize the NetworkHandler by starting threads and opening the socket. + */ + public NetworkHandler( int port, InetAddress listenAddress ) throws SocketException + { + socket = new DatagramSocket( port, listenAddress ); + sendThread = new Thread( sender = new Sender() ); + } + + public void shutdown() + { + socket.close(); + } + + /** + * Prepare and enqueue reply for client request. + * Only ever to be called from the receiving thread. The reply message is crafted + * and then handed over to the sending thread. + * + * @param destination SocketAddress of the client + * @param messageId The same ID the client used in it's request. + * It's echoed back to the client to enable request bursts, and has no meaning for the + * server. + * @param status A TaskStatus instance to be serialized to json and sent to the client. + */ + private void send( SocketAddress destination, byte[] buffer ) + { + final DatagramPacket packet; + try { + packet = new DatagramPacket( buffer, buffer.length, destination ); + } catch ( SocketException e ) { + log.warn( "Could not construct datagram packet for target " + destination.toString() ); + e.printStackTrace(); + return; + } + sender.send( packet ); + } + + /** + * Main loop of receiving thread - wait until a packet arrives, then try to handle/decode + */ + @Override + public void run() + { + byte readBuffer[] = new byte[ 66000 ]; + try { + sendThread.start(); + while ( !Thread.interrupted() ) { + DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length ); + try { + socket.receive( packet ); + } catch ( IOException e ) { + log.info( "IOException on UDP socket when reading: " + e.getMessage() ); + Thread.sleep( 100 ); + continue; + } + if ( packet.getLength() < 2 ) { + log.debug( "Message too short" ); + continue; + } + String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 ); + try { + String reply = handle( payload ); + if ( reply != null ) + send( packet.getSocketAddress(), reply.getBytes( StandardCharsets.UTF_8 ) ); + } catch ( Throwable t ) { + log.error( "Exception in RequestParser: " + t.toString() ); + log.error( "Payload was: " + payload ); + t.printStackTrace(); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + sendThread.interrupt(); + log.info( "UDP receiver finished." ); + } + } + + private String handle( String payload ) + { + try { + JsonUser ju = gson.fromJson( payload, JsonUser.class ); + User u = ju.toUser(); + if ( u == null ) { + log.warn( "Invalid or inomplete RPC data (" + payload + ")" ); + return "Invalid or incomplete RPC data"; + } + ClientSessionData sd = SessionManager.addSession( new Session( u ) ); + return "TOKEN:" + sd.authToken + " SESSIONID:" + sd.sessionId; + } catch ( Throwable t ) { + log.error( "Exception on json decode", t ); + } + return "Json error"; + } + + /** + * Private sending thread. + * Use blocking queue, wait for packet to be added to it, then try to send. + */ + private class Sender implements Runnable + { + + /** + * Queue to stuff outgoing packets into. + */ + private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 ); + + /** + * Wait until something is put into the queue, then send it. + */ + @Override + public void run() + { + try { + while ( !Thread.interrupted() ) { + final DatagramPacket packet; + packet = queue.take(); + try { + socket.send( packet ); + } catch ( IOException e ) { + log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() ); + } + } + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } finally { + log.info( "UDP sender finished." ); + } + } + + /** + * Add something to the outgoing packet queue. + * Called from the receiving thread. + */ + public void send( DatagramPacket packet ) + { + if ( queue.offer( packet ) ) + return; + log.warn( "Could not add packet to queue: Full" ); + } + + } + +} |