summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Listener.java
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-20 17:10:14 +0200
committerSimon Rettberg2016-04-20 17:10:14 +0200
commitecd3d22510aa2f1aa0c44cee015bd690d19f45ce (patch)
tree8ec91bf9500a9575308898f0f70b5a90f0ba4737 /src/main/java/org/openslx/filetransfer/Listener.java
parentAdd queryUploadStatus to master server (diff)
downloadmaster-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.gz
master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.xz
master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.zip
More imgsync stuff
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Listener.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/Listener.java86
1 files changed, 57 insertions, 29 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java
index e6bbb62..f7d4225 100644
--- a/src/main/java/org/openslx/filetransfer/Listener.java
+++ b/src/main/java/org/openslx/filetransfer/Listener.java
@@ -4,6 +4,11 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
@@ -18,9 +23,10 @@ public class Listener
private ServerSocket listenSocket = null;
private Thread acceptThread = null;
private final int readTimeoutMs;
+ private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
- private static final byte U = 85; // hex - code 'U' = 85.
- private static final byte D = 68; // hex - code 'D' = 68.
+ private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85.
+ private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68.
private static Logger log = Logger.getLogger( Listener.class );
/***********************************************************************/
@@ -48,8 +54,10 @@ public class Listener
* connection, and start Downloader or Uploader.
*
*/
- private boolean listen()
+ private synchronized boolean listen()
{
+ if ( listenSocket != null )
+ return true;
try {
if ( this.context == null ) {
listenSocket = new ServerSocket();
@@ -61,61 +69,81 @@ public class Listener
listenSocket.bind( new InetSocketAddress( this.port ) );
} catch ( Exception e ) {
log.error( "Cannot listen on port " + this.port, e );
+ listenSocket = null;
return false;
}
return true;
}
- private void run()
+ private synchronized void run()
{
+ if ( acceptThread != null )
+ return;
final Listener instance = this;
acceptThread = new Thread( "BFTP-Listen-" + this.port ) {
@Override
public void run()
{
try {
+ // Run accept loop in own thread
while ( !isInterrupted() ) {
- Socket connectionSocket = null;
+ Socket acceptedSocket = null;
try {
- connectionSocket = listenSocket.accept();
+ acceptedSocket = listenSocket.accept();
} catch ( SocketTimeoutException e ) {
continue;
} catch ( Exception e ) {
log.warn( "Some exception when accepting! Trying to resume...", e );
Transfer.safeClose( listenSocket );
+ listenSocket = null;
if ( !listen() ) {
log.error( "Could not re-open listening socket" );
break;
}
continue;
}
- try {
- connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections
+ // Handle each accepted connection in a thread pool
+ final Socket connection = acceptedSocket;
+ Runnable handler = new Runnable() {
+ @Override
+ public void run()
+ {
- byte[] b = new byte[ 1 ];
- int length = connectionSocket.getInputStream().read( b );
- if ( length == -1 )
- continue;
+ try {
+ // Give initial byte signalling mode of operation 5 secs to arrive
+ connection.setSoTimeout( 5000 );
- connectionSocket.setSoTimeout( readTimeoutMs );
+ byte[] b = new byte[ 1 ];
+ int length = connection.getInputStream().read( b );
+ if ( length == -1 ) {
+ Transfer.safeClose( connection );
+ return;
+ }
+ // Byte arrived, now set desired timeout
+ connection.setSoTimeout( readTimeoutMs );
- if ( b[0] == U ) {
- // --> start Downloader(socket).
- Downloader d = new Downloader( connectionSocket );
- incomingEvent.incomingUploadRequest( d );
- }
- else if ( b[0] == D ) {
- // --> start Uploader(socket).
- Uploader u = new Uploader( connectionSocket );
- incomingEvent.incomingDownloadRequest( u );
- }
- else {
- log.debug( "Got invalid init-byte ... close connection" );
- connectionSocket.close();
+ if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) {
+ // --> start Downloader(socket).
+ Downloader d = new Downloader( connection );
+ incomingEvent.incomingUploadRequest( d );
+ } else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) {
+ // --> start Uploader(socket).
+ Uploader u = new Uploader( connection );
+ incomingEvent.incomingDownloadRequest( u );
+ } else {
+ log.debug( "Got invalid init-byte ... close connection" );
+ Transfer.safeClose( connection );
+ }
+ } catch ( Exception e ) {
+ log.warn( "Error accepting client", e );
+ Transfer.safeClose( connection );
+ }
}
- } catch ( Exception e ) {
- log.warn( "Error accepting client", e );
- Transfer.safeClose( connectionSocket );
+ };
+ try {
+ processingPool.execute( handler );
+ } catch ( RejectedExecutionException e ) {
+ Transfer.safeClose( acceptedSocket );
}
}
} finally {