blob: e37b3d73d282e7347ff7f0233bdbb44a2dc7b9f5 (
plain) (
tree)
|
|
package org.openslx.taskmanager.network;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
/**
* The network listener that will receive incoming UDP packets, try to process
* them, and then send a reply.
*/
public class NetworkHandlerTcp extends NetworkHandlerBase
{
private static final Logger log = Logger.getLogger( NetworkHandlerTcp.class );
/**
* UDP socket for sending and receiving.
*/
private final ServerSocket socket;
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 8, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
*/
public NetworkHandlerTcp( int port, InetAddress listenAddress, RequestParser parser ) throws IOException
{
super( parser );
socket = new ServerSocket( port, 10, listenAddress );
log.info( "Listening on TCP:" + port );
threadPool.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
}
public void shutdown()
{
try {
socket.close();
} catch ( IOException e ) {
}
}
/**
* Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
*/
@Override
public void run()
{
try {
while ( !Global.doShutdown ) {
Socket client;
try {
client = socket.accept();
} catch ( IOException e1 ) {
log.warn( "ACCEPT fail", e1 );
break;
}
try {
threadPool.execute( new ClientTask( client ) );
} catch ( RejectedExecutionException e ) {
try {
client.close();
} catch ( IOException e1 ) {
}
}
}
} finally {
Thread.currentThread().interrupt();
Global.doShutdown = true;
log.info( "UDP receiver finished." );
}
}
private class ClientTask implements Runnable
{
private final Socket socket;
private DataOutputStream writer = null;
private DataInputStream reader = null;
public ClientTask( Socket client )
{
this.socket = client;
}
@Override
public void run()
{
try {
try {
socket.setSoTimeout( (int)TimeUnit.MINUTES.toMillis( 15 ) );
reader = new DataInputStream( socket.getInputStream() );
writer = new DataOutputStream( socket.getOutputStream() );
} catch ( IOException e ) {
log.info( "IOException on TCP socket when setting up streams", e );
return;
}
String payload = readMsg( reader );
if ( payload == null )
return;
int i = payload.indexOf( ' ' );
if ( i != -1 ) {
// For future extensibility we throw away everything after the first space (including the space)
payload = payload.substring( 0, i );
}
if ( !payload.equals( Global.PASSWORD ) ) {
sendMsg( "ERROR,Wrong password" );
return;
}
while ( !socket.isClosed() ) {
payload = readMsg( reader );
if ( payload == null )
return;
byte[] reply;
try {
reply = parser.handle( payload );
} catch ( Throwable t ) {
log.error( "Exception in RequestParser", t );
log.error( "Payload was: " + payload );
continue;
}
if ( reply != null ) {
sendMsg( reply );
}
}
} catch ( SendException e ) {
log.warn( "Cannot send reply to client", e );
} finally {
try {
if ( writer != null ) {
writer.flush();
writer.close();
}
socket.close();
} catch ( IOException e ) {
}
}
}
private void sendMsg( String reply ) throws SendException
{
sendMsg( reply.getBytes( StandardCharsets.UTF_8 ) );
}
private void sendMsg( byte[] reply ) throws SendException
{
try {
writer.writeInt( reply.length );
writer.write( reply );
} catch ( IOException e ) {
throw new SendException();
}
}
private String readMsg( DataInputStream reader )
{
int bytes;
try {
bytes = reader.readInt();
} catch ( IOException e ) {
// This should be fine... Client went away
return null;
}
if ( bytes < 0 || bytes > Global.MAX_REQUEST_SIZE ) {
log.info( "Invalid request size: " + bytes );
return null;
}
if ( bytes == 0 )
return ""; // Nothing to read
byte[] buffer = new byte[ bytes ];
try {
reader.readFully( buffer );
} catch ( IOException e ) {
log.warn( "Client went away when trying to read payload" );
return null;
}
return new String( buffer, StandardCharsets.UTF_8 );
}
}
private static class SendException extends Exception {}
}
|