|
|
package org.openslx.taskmanager.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
/**
* The network listener that will receive incoming UDP packets, try to process
* them, and then send a reply.
*/
public class NetworkHandler implements Runnable
{
private static final Logger log = Logger.getLogger( NetworkHandler.class );
// Static part
private Thread sendThread = null;
/**
* Sender instance (Runnable handling outgoing packets)
*/
private final Sender sender;
/**
* UDP socket for sending and receiving.
*/
private final DatagramSocket socket;
private final RequestParser parser;
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
*/
public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
{
socket = new DatagramSocket( port, listenAddress );
sendThread = new Thread( sender = new Sender() );
this.parser = parser;
}
public void shutdown()
{
socket.close();
}
// Class part
/**
* Prepare and enqueue reply for client request.
* Only ever to be called from the receiving thread. The reply message is crafted
* and then handed over to the sending thread.
*
* @param destination SocketAddress of the client
* @param messageId The same ID the client used in it's request.
* It's echoed back to the client to enable request bursts, and has no meaning for the
* server.
* @param status A TaskStatus instance to be serialized to json and sent to the client.
*/
private void send( SocketAddress destination, byte[] buffer )
{
final DatagramPacket packet;
try {
packet = new DatagramPacket( buffer, buffer.length, destination );
} catch ( Exception e ) {
log.warn( "Could not construct datagram packet for target " + destination.toString() );
e.printStackTrace();
return;
}
sender.send( packet );
}
/**
* Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
*/
@Override
public void run()
{
byte readBuffer[] = new byte[ 66000 ];
try {
sendThread.start();
while ( !Global.doShutdown ) {
DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
try {
socket.receive( packet );
} catch ( IOException e ) {
log.info( "IOException on UDP socket when reading: " + e.getMessage() );
Thread.sleep( 100 );
continue;
}
if ( packet.getLength() < 2 ) {
log.debug( "Message too short" );
continue;
}
String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
try {
byte[] reply = parser.handle( payload );
if ( reply != null )
send( packet.getSocketAddress(), reply );
} catch ( Throwable t ) {
log.error( "Exception in RequestParser: " + t.toString() );
log.error( "Payload was: " + payload );
t.printStackTrace();
}
}
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
} finally {
Global.doShutdown = true;
sendThread.interrupt();
log.info( "UDP receiver finished." );
}
}
/**
* Private sending thread.
* Use blocking queue, wait for packet to be added to it, then try to send.
*/
private class Sender implements Runnable
{
/**
* Queue to stuff outgoing packets into.
*/
private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 );
/**
* Wait until something is put into the queue, then send it.
*/
@Override
public void run()
{
try {
while ( !Global.doShutdown ) {
final DatagramPacket packet;
packet = queue.take();
try {
socket.send( packet );
} catch ( IOException e ) {
log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString(), e );
}
}
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
} finally {
Global.doShutdown = true;
log.info( "UDP sender finished." );
}
}
/**
* Add something to the outgoing packet queue.
* Called from the receiving thread.
*/
public void send( DatagramPacket packet )
{
if ( queue.offer( packet ) )
return;
log.warn( "Could not add packet to queue: Full" );
}
}
}
|