package org.openslx.imagemaster.serverconnection; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.security.KeyStore; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.filetransfer.util.AbstractTransfer; import org.openslx.imagemaster.Globals; import org.openslx.util.GrowingThreadPoolExecutor; /** * Class to handle all incoming and outgoing connections. * Also handles the authentication and the saving/delivering of images. */ public class ConnectionHandler implements IncomingEvent { private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class ); private static final int MAX_TRANSFERS = 12; private static Map incomingTransfers = new ConcurrentHashMap<>(); private static Map outgoingTransfers = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES, new SynchronousQueue(), new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) ); private static final Listener plainListener; private static final Listener sslListener; static { LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); Listener ssl = null; Listener plain = null; try { String pathToKeyStore = Globals.getSslKeystoreFile(); char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); KeyStore keystore = KeyStore.getInstance( "JKS" ); keystore.load( new FileInputStream( pathToKeyStore ), passphrase ); KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() ); kmf.init( keystore, passphrase ); SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" ); KeyManager[] keyManagers = kmf.getKeyManagers(); sslContext.init( keyManagers, null, null ); ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); ssl.start(); plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 ); plain.start(); } catch ( Exception e ) { LOGGER.error( "Initialization failed.", e ); System.exit( 2 ); } sslListener = ssl; plainListener = plain; } public static IncomingTransfer registerUpload( ImagePublishData img, List blockHashes ) throws TTransferRejectedException, TInvocationException { IncomingTransfer transfer; synchronized ( incomingTransfers ) { transfer = incomingTransfers.get( img.imageVersionId ); if ( transfer == null ) { if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { throw new TTransferRejectedException( "Too many active transfers" ); } try { transfer = new IncomingTransfer( img, blockHashes ); } catch ( FileNotFoundException e ) { LOGGER.warn( "Cannot init download", e ); throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" ); } incomingTransfers.put( transfer.getId(), transfer ); incomingTransfers.put( img.imageVersionId, transfer ); } } return transfer; } public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List crcSums ) throws TTransferRejectedException { IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId ); if ( transfer == null ) return null; if ( transfer.getFileSize() != imageData.fileSize ) throw new TTransferRejectedException( "File size mismatch" ); if ( !transfer.hashesEqual( crcSums ) ) throw new TTransferRejectedException( "Block hashes mismatch" ); return transfer; } /** * Server is uploading - client is downloading! */ @Override public void incomingDownloadRequest( final Uploader uploader ) { // TODO uploader.sendErrorCode( "Too many concurrent uploads." ); uploader.cancel(); } /** * Server is downloading - client is uploading! */ @Override public void incomingUploadRequest( final Downloader downloader ) throws IOException { IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() ); if ( transfer == null ) { downloader.sendErrorCode( "Unknown upload token." ); downloader.cancel(); return; } if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { downloader.sendErrorCode( "Too many concurrent uploads." ); downloader.cancel(); return; } if ( !transfer.addConnection( downloader, transferPool ) ) { downloader.cancel(); } } public static int getUploadConnectionCount() { final long now = System.currentTimeMillis(); int active = 0; for ( IncomingTransfer t : incomingTransfers.values() ) { if ( t.countsTowardsConnectionLimit( now ) ) { active += t.getActiveConnectionCount(); } } return active; } }