package org.openslx.imagemaster.serverconnection; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.security.KeyStore; import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.imagemaster.Globals; import org.openslx.imagemaster.db.mappers.DbImage; import org.openslx.thrifthelper.ImagePublishDataEx; import org.openslx.util.GrowingThreadPoolExecutor; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; /** * Class to handle all incoming and outgoing connections. * Also handles the authentication and the saving/delivering of images. */ public class ConnectionHandler implements IncomingEvent { private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class ); private static final int MAX_TRANSFERS = 12; private static Map incomingTransfersByTransferId = new ConcurrentHashMap<>(); private static final Map incomingTransfersByVersionId = new ConcurrentHashMap<>(); private static Map outgoingTransfers = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES, new SynchronousQueue(), new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) ); private static final Listener plainListener; private static final Listener sslListener; static { LOGGER.debug( "Starting BFTP on port " + Globals.getFiletransferPortSsl() + "+ and " + Globals.getFiletransferPortPlain() ); Listener ssl = null; Listener plain = null; try { String pathToKeyStore = Globals.getSslKeystoreFile(); char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); KeyStore keystore = KeyStore.getInstance( "JKS" ); keystore.load( new FileInputStream( pathToKeyStore ), passphrase ); KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() ); kmf.init( keystore, passphrase ); SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" ); KeyManager[] keyManagers = kmf.getKeyManagers(); sslContext.init( keyManagers, null, null ); ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); ssl.start(); plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 ); plain.start(); // TODO: Bail out/retry if failed, getters for ports } catch ( Exception e ) { LOGGER.error( "Initialization failed.", e ); System.exit( 2 ); } sslListener = ssl; plainListener = plain; QuickTimer.scheduleAtFixedDelay( new Task() { @Override public void fire() { long now = System.currentTimeMillis(); for ( Iterator it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) { IncomingTransfer t = it.next(); if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) { LOGGER.debug( "Removing transfer " + t.getId() ); it.remove(); } } for ( Iterator it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) { IncomingTransfer t = it.next(); if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) { it.remove(); } } } }, 10000, 301000 ); } public static int getSslPort() { if ( sslListener.isRunning() ) return sslListener.getPort(); return 0; } public static int getPlainPort() { if ( plainListener.isRunning() ) return plainListener.getPort(); return 0; } /** * Register new incoming transfer from a satellite server. * * @param img the desired meta data for the new image/version, as supplied by the satellite * @param blockHashes list of block hashes for the image * @param existing OPTIONAL if the image version already exists on the server, this is given so * we can repair/complete the local file, if necessary * @return The assigned transfer object */ public static IncomingTransfer registerUpload( ImagePublishData img, List blockHashes, ImagePublishDataEx existing ) throws TTransferRejectedException, TInvocationException { IncomingTransfer transfer; synchronized ( incomingTransfersByTransferId ) { transfer = incomingTransfersByVersionId.get( img.imageVersionId ); if ( transfer == null ) { if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { throw new TTransferRejectedException( "Too many active transfers" ); } File absDestination; if ( existing == null ) { absDestination = new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId ); } else { absDestination = new File( Globals.getImageDir(), existing.exImagePath ); } plainListener.start(); sslListener.start(); try { transfer = new IncomingTransfer( img, blockHashes, absDestination, getPlainPort(), getSslPort() ); } catch ( FileNotFoundException e ) { LOGGER.warn( "Cannot init download to " + absDestination.toString(), e ); throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" ); } LOGGER.info( "New incoming upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" ); incomingTransfersByTransferId.put( transfer.getId(), transfer ); incomingTransfersByVersionId.put( img.imageVersionId, transfer ); } else { LOGGER.info( "Another request for existing upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" ); } } return transfer; } public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List crcSums ) throws TTransferRejectedException { IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId ); if ( transfer == null ) return null; if ( transfer.getFileSize() != imageData.fileSize ) throw new TTransferRejectedException( "File size mismatch" ); if ( !transfer.hashesEqual( crcSums ) ) throw new TTransferRejectedException( "Block hashes mismatch" ); return transfer; } public static IncomingTransfer getUploadByToken( String uploadToken ) { return incomingTransfersByTransferId.get( uploadToken ); } /** * Server is uploading - client is downloading! */ @Override public void incomingDownloadRequest( final Uploader uploader ) { OutgoingTransfer transfer = outgoingTransfers.get( uploader.getToken() ); if ( transfer == null ) { LOGGER.debug( "Unknown download token received" ); uploader.sendErrorCode( "Unknown download token." ); uploader.cancel(); return; } if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) { uploader.sendErrorCode( "Too many concurrent uploads." ); uploader.cancel(); } if ( !transfer.addConnection( uploader, transferPool ) ) { uploader.cancel(); } } /** * Server is downloading - client is uploading! */ @Override public void incomingUploadRequest( final Downloader downloader ) throws IOException { IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() ); if ( transfer == null ) { downloader.sendErrorCode( "Unknown upload token." ); downloader.cancel(); return; } if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { downloader.sendErrorCode( "Too many concurrent uploads." ); downloader.cancel(); return; } if ( !transfer.addConnection( downloader, transferPool ) ) { downloader.cancel(); } } public static int getUploadConnectionCount() { final long now = System.currentTimeMillis(); int active = 0; for ( IncomingTransfer t : incomingTransfersByTransferId.values() ) { if ( t.countsTowardsConnectionLimit( now ) ) { active += t.getActiveConnectionCount(); } } return active; } public static int getDownloadConnectionCount() { final long now = System.currentTimeMillis(); int active = 0; for ( OutgoingTransfer t : outgoingTransfers.values() ) { if ( t.countsTowardsConnectionLimit( now ) ) { active += t.getActiveConnectionCount(); } } return active; } public static void removeUpload( IncomingTransfer transfer ) { incomingTransfersByTransferId.remove( transfer.getId() ); incomingTransfersByVersionId.remove( transfer.getImageVersionId() ); } public static TransferInformation registerDownload( ImagePublishDataEx img ) throws TTransferRejectedException, TInvocationException { OutgoingTransfer transfer; File absSource; absSource = new File( Globals.getImageDir(), img.exImagePath ); if ( !absSource.exists() ) { LOGGER.error( absSource.toString() + " missing!" ); try { DbImage.markValid( img.imageVersionId, false ); } catch ( SQLException e ) { } throw new TTransferRejectedException( "File missing on server" ); } if ( absSource.length() != img.fileSize ) { LOGGER.error( absSource.toString() + " has unexpected size (is: " + absSource.length() + ", should: " + img.fileSize + ")" ); try { DbImage.markValid( img.imageVersionId, false ); } catch ( SQLException e ) { } throw new TTransferRejectedException( "File corrupted on server" ); } synchronized ( outgoingTransfers ) { if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) { throw new TTransferRejectedException( "Too many active transfers" ); } plainListener.start(); sslListener.start(); transfer = new OutgoingTransfer( absSource, getPlainPort(), getSslPort() ); outgoingTransfers.put( transfer.getId(), transfer ); } return transfer.getTransferInfo(); } }