diff options
author | Simon Rettberg | 2016-04-20 17:10:14 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-20 17:10:14 +0200 |
commit | ecd3d22510aa2f1aa0c44cee015bd690d19f45ce (patch) | |
tree | 8ec91bf9500a9575308898f0f70b5a90f0ba4737 /src/main/java/org/openslx/filetransfer/Listener.java | |
parent | Add queryUploadStatus to master server (diff) | |
download | master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.gz master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.xz master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.zip |
More imgsync stuff
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Listener.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/Listener.java | 86 |
1 files changed, 57 insertions, 29 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java index e6bbb62..f7d4225 100644 --- a/src/main/java/org/openslx/filetransfer/Listener.java +++ b/src/main/java/org/openslx/filetransfer/Listener.java @@ -4,6 +4,11 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLServerSocketFactory; @@ -18,9 +23,10 @@ public class Listener private ServerSocket listenSocket = null; private Thread acceptThread = null; private final int readTimeoutMs; + private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); - private static final byte U = 85; // hex - code 'U' = 85. - private static final byte D = 68; // hex - code 'D' = 68. + private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85. + private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68. private static Logger log = Logger.getLogger( Listener.class ); /***********************************************************************/ @@ -48,8 +54,10 @@ public class Listener * connection, and start Downloader or Uploader. * */ - private boolean listen() + private synchronized boolean listen() { + if ( listenSocket != null ) + return true; try { if ( this.context == null ) { listenSocket = new ServerSocket(); @@ -61,61 +69,81 @@ public class Listener listenSocket.bind( new InetSocketAddress( this.port ) ); } catch ( Exception e ) { log.error( "Cannot listen on port " + this.port, e ); + listenSocket = null; return false; } return true; } - private void run() + private synchronized void run() { + if ( acceptThread != null ) + return; final Listener instance = this; acceptThread = new Thread( "BFTP-Listen-" + this.port ) { @Override public void run() { try { + // Run accept loop in own thread while ( !isInterrupted() ) { - Socket connectionSocket = null; + Socket acceptedSocket = null; try { - connectionSocket = listenSocket.accept(); + acceptedSocket = listenSocket.accept(); } catch ( SocketTimeoutException e ) { continue; } catch ( Exception e ) { log.warn( "Some exception when accepting! Trying to resume...", e ); Transfer.safeClose( listenSocket ); + listenSocket = null; if ( !listen() ) { log.error( "Could not re-open listening socket" ); break; } continue; } - try { - connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections + // Handle each accepted connection in a thread pool + final Socket connection = acceptedSocket; + Runnable handler = new Runnable() { + @Override + public void run() + { - byte[] b = new byte[ 1 ]; - int length = connectionSocket.getInputStream().read( b ); - if ( length == -1 ) - continue; + try { + // Give initial byte signalling mode of operation 5 secs to arrive + connection.setSoTimeout( 5000 ); - connectionSocket.setSoTimeout( readTimeoutMs ); + byte[] b = new byte[ 1 ]; + int length = connection.getInputStream().read( b ); + if ( length == -1 ) { + Transfer.safeClose( connection ); + return; + } + // Byte arrived, now set desired timeout + connection.setSoTimeout( readTimeoutMs ); - if ( b[0] == U ) { - // --> start Downloader(socket). - Downloader d = new Downloader( connectionSocket ); - incomingEvent.incomingUploadRequest( d ); - } - else if ( b[0] == D ) { - // --> start Uploader(socket). - Uploader u = new Uploader( connectionSocket ); - incomingEvent.incomingDownloadRequest( u ); - } - else { - log.debug( "Got invalid init-byte ... close connection" ); - connectionSocket.close(); + if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) { + // --> start Downloader(socket). + Downloader d = new Downloader( connection ); + incomingEvent.incomingUploadRequest( d ); + } else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) { + // --> start Uploader(socket). + Uploader u = new Uploader( connection ); + incomingEvent.incomingDownloadRequest( u ); + } else { + log.debug( "Got invalid init-byte ... close connection" ); + Transfer.safeClose( connection ); + } + } catch ( Exception e ) { + log.warn( "Error accepting client", e ); + Transfer.safeClose( connection ); + } } - } catch ( Exception e ) { - log.warn( "Error accepting client", e ); - Transfer.safeClose( connectionSocket ); + }; + try { + processingPool.execute( handler ); + } catch ( RejectedExecutionException e ) { + Transfer.safeClose( acceptedSocket ); } } } finally { |