summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristoph Schulthess2017-02-01 14:54:59 +0100
committerChristoph Schulthess2017-02-01 14:54:59 +0100
commit833839dbd3bf368c7b1a5ebdfaf073e393174303 (patch)
tree16319d8cc3822dbedf925696c146b2b8ba1f880e
parentmeaningful comments (diff)
downloadtmlite-bwlp-833839dbd3bf368c7b1a5ebdfaf073e393174303.tar.gz
tmlite-bwlp-833839dbd3bf368c7b1a5ebdfaf073e393174303.tar.xz
tmlite-bwlp-833839dbd3bf368c7b1a5ebdfaf073e393174303.zip
DispatchRelay Task
-rw-r--r--src/main/java/org/openslx/taskmanager/tasks/DispatchRelay.java163
1 files changed, 163 insertions, 0 deletions
diff --git a/src/main/java/org/openslx/taskmanager/tasks/DispatchRelay.java b/src/main/java/org/openslx/taskmanager/tasks/DispatchRelay.java
new file mode 100644
index 0000000..cddf1e5
--- /dev/null
+++ b/src/main/java/org/openslx/taskmanager/tasks/DispatchRelay.java
@@ -0,0 +1,163 @@
+package org.openslx.taskmanager.tasks;
+
+import java.net.Socket;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import org.openslx.taskmanager.api.AbstractTask;
+
+import com.google.gson.annotations.Expose;
+
+public class DispatchRelay extends AbstractTask {
+
+ private Output status;
+
+ @Expose
+ private String[] hosts;
+ @Expose
+ private int[] ports;
+ @Expose
+ private String[] descs;
+
+ @Override
+ protected boolean execute()
+ {
+ Socket[] sockets;
+ Thread[] relayThreads;
+ try {
+ sockets = createSockets();
+ status.addMessage( "Sockets connected." );
+ } catch ( IOException iox ) {
+ status.addMessage( "Failed to connect sockets. " + iox.getMessage() );
+ return false;
+ }
+
+ try {
+ relayThreads = createRelayThreads( sockets );
+ status.addMessage( "Relays created." );
+ } catch ( IOException iox ) {
+ status.addMessage( "Failed to create relays. " + iox.getMessage() );
+ return false;
+ }
+
+ for ( Thread t : relayThreads )
+ t.start();
+
+ try {
+ for ( Thread t : relayThreads )
+ t.join();
+ } catch ( InterruptedException inx ) {} finally {
+ try {
+ for ( Socket s : sockets ) {
+ if ( !s.isClosed() )
+ s.close();
+ }
+ status.addMessage( "Sockets closed properly." );
+ } catch ( IOException iox ) {
+ status.addMessage( "Failed to close Sockets." + iox.getMessage() );
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean initTask()
+ {
+ this.setStatusObject( status );
+
+ if ( hosts.length != 2 || ports.length != 2 ) {
+ status.addMessage( "Invalid host/port list." );
+ return false;
+ }
+ return true;
+ }
+
+ protected Thread[] createRelayThreads ( Socket[] sockets) throws IOException
+ {
+ Thread[] t = new Thread[2];
+
+ for ( int i = 0; i < 2; i++ ) {
+ t[i] = new Thread( new Relay( sockets, status, descs[i] ));
+ }
+ return t;
+ }
+
+ protected Socket[] createSockets () throws IOException
+ {
+ Socket[] s = new Socket[2];
+
+ for ( int i = 0; i < 2; i++ ) {
+ s[i] = new Socket();
+ s[i].connect( new InetSocketAddress( hosts[i], ports[i] ), 1200 );
+ }
+ return s;
+ }
+
+ public static class Output
+ {
+ protected String messages = null;
+
+ public void addMessage( String str )
+ {
+ if ( messages == null )
+ {
+ messages = str;
+ } else {
+ messages += "\n" + str;
+ }
+ }
+ }
+
+ private class Relay implements Runnable {
+
+ private InputStream in;
+ private OutputStream out;
+
+ private Output status;
+ public final String desc;
+
+ private byte[] buffer = new byte[16384];
+
+ public Relay ( Socket[] s, Output status, String desc ) throws IOException
+ {
+ in = s[0].getInputStream();
+ out = s[1].getOutputStream();
+ this.status = status;
+ this.desc = desc;
+ }
+
+ private void relay() throws IOException, InterruptedException
+ {
+ int readBytes = in.read( buffer );
+ out.write( buffer, 0, readBytes );
+ }
+
+ public void close()
+ {
+ try {
+ if ( this.in != null )
+ this.in.close();
+ if ( this.out != null )
+ this.out.close();
+ this.status.addMessage( desc + ": input/output streams closed properly." );
+ } catch ( IOException iox ) {
+ this.status.addMessage( desc + ": failed to close input/ouput streams. " + iox.getMessage() );
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ while ( true ) {
+ try {
+ relay();
+ this.status.addMessage( desc + ": relay operating." );
+ } catch ( Exception x ) {
+ this.status.addMessage( desc + ": close relay. " + x.getMessage() );
+ this.close();
+ }
+ }
+ }
+ }
+}