summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Listener.java
blob: fc990fc1e94883272eb0c61953bd9cb29a544f4d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package org.openslx.filetransfer;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLServerSocketFactory;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.util.PrioThreadFactory;

public class Listener
{
	private final IncomingEvent incomingEvent;
	private final SSLContext context;
	private final int port;
	private ServerSocket listenSocket = null;
	private Thread acceptThread = null;
	private final int readTimeoutMs;
	private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(),
			new PrioThreadFactory( "BFTP-BS" ) );

	private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // ASCII 'U' = 85.
	private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // ASCII 'D' = 68.
	private static Logger log = LogManager.getLogger( Listener.class );

	/***********************************************************************/
	/**
	 * File transfer listener. This is the active side, opening a port and
	 * waiting for incoming connections.
	 * 
	 * @param e the event handler for incoming connections
	 * @param context the SSL context used for encryption; if null, unencrypted connections will be
	 *           used
	 * @param port port to listen on
	 * @param timeoutMs socket timeout for accepted connections
	 */
	public Listener( IncomingEvent e, SSLContext context, int port, int readTimeoutMs )
	{
		this.incomingEvent = e;
		this.context = context;
		this.port = port;
		this.readTimeoutMs = readTimeoutMs;
	}

	/***********************************************************************/
	/**
	 * Method listen, should run from Master Server. Listen for incoming
	 * connection, and start Downloader or Uploader.
	 * 
	 */
	private synchronized boolean listen()
	{
		if ( listenSocket != null )
			return true;
		try {
			if ( this.context == null ) {
				listenSocket = new ServerSocket();
			} else {
				SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory();
				listenSocket = sslServerSocketFactory.createServerSocket();
			}
			listenSocket.setSoTimeout( 5000 );
			listenSocket.setReuseAddress( true );
			listenSocket.bind( new InetSocketAddress( this.port ) );
			listenSocket.setSoTimeout( 0 );
		} catch ( Exception e ) {
			log.error( "Cannot listen on port " + this.port, e );
			listenSocket = null;
			return false;
		}
		return true;
	}

	private synchronized void run()
	{
		if ( acceptThread != null )
			return;
		final Listener instance = this;
		acceptThread = new Thread( "BFTP:" + this.port ) {
			@Override
			public void run()
			{
				try {
					// Run accept loop in own thread
					while ( !isInterrupted() ) {
						final Socket connection;
						try {
							connection = listenSocket.accept();
						} catch ( SocketTimeoutException e ) {
							continue;
						} catch ( Exception e ) {
							log.warn( "Some exception when accepting! Trying to resume...", e );
							Transfer.safeClose( listenSocket );
							listenSocket = null;
							if ( !listen() ) {
								log.error( "Could not re-open listening socket" );
								break;
							}
							continue;
						}
						// Handle each accepted connection in a thread pool
						Runnable handler = new Runnable() {
							@Override
							public void run()
							{
								try {
									// Give initial byte signaling mode of operation 5 secs to arrive
									connection.setSoTimeout( 5000 );

									byte[] b = new byte[ 1 ];
									int length = connection.getInputStream().read( b );
									if ( length == -1 ) {
										Transfer.safeClose( connection );
										return;
									}
									// Byte arrived, now set desired timeout
									connection.setSoTimeout( readTimeoutMs );

									if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) {
										// --> start Downloader(socket).
										Downloader d = new Downloader( connection );
										// Will take care of connection cleanup
										incomingEvent.incomingUploadRequest( d );
									} else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) {
										// --> start Uploader(socket).
										Uploader u = new Uploader( connection );
										// Will take care of connection cleanup
										incomingEvent.incomingDownloadRequest( u );
									} else {
										log.debug( "Got invalid init-byte ... closing connection" );
										Transfer.safeClose( connection );
									}
								} catch ( SSLException e ) {
									Transfer.safeClose( connection );
									log.warn( "SSL error when acceping client " + connection.getInetAddress().getHostAddress() );
								} catch ( SocketException e ) {
									// No reason to log, probably - connection where client did nothing after connecting.
								} catch ( Exception e ) {
									Transfer.safeClose( connection );
									log.warn( "Error handling client", e );
								}
							}
						};
						try {
							processingPool.execute( handler );
						} catch ( RejectedExecutionException e ) {
							Transfer.safeClose( connection );
						}
					}
				} finally {
					synchronized ( instance ) {
						Transfer.safeClose( listenSocket );
						listenSocket = null;
					}
				}
			}
		};
		acceptThread.setDaemon( true );
		acceptThread.start();
		log.info( "Starting to accept " + ( this.context == null ? "UNENCRYPTED" : "encrypted" ) + " connections on port " + this.port );
	}

	public int getPort()
	{
		return this.port;
	}

	/**
	 * Check whether this listener is running.
	 * 
	 * @return true if this instance is currently listening for connections and runs the accept loop.
	 */
	public synchronized boolean isRunning()
	{
		return acceptThread != null && acceptThread.isAlive() && listenSocket != null && !listenSocket.isClosed();
	}

	/**
	 * Check whether this listener was started.
	 * 
	 * @return true if this instance was started before, but might have been stopped already.
	 */
	public synchronized boolean wasStarted()
	{
		return acceptThread != null;
	}

	/**
	 * Start this listener.
	 * 
	 * @return true if the port could be openened and the accepting thread was started
	 */
	public synchronized boolean start()
	{
		if ( !this.listen() )
			return false;
		this.run();
		return true;
	}
}