summaryrefslogblamecommitdiffstats
path: root/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
blob: f32c655c1d4112cc91a6fa3cd531719be4c3e2ad (plain) (tree)
1
2
3
4
5
6
7
8
9
10

                                                 
                    
                               
                                     
                           
                           
                              

                             
                      
                     
                                              
                                            
                                             
                                     





                                       



                                                                
                                                         




                                              

                                                   
                                                  

                                        


                                                         
                                                                       


                                                       
 
                                                                                         
 

                                                    



                                                                                                                    
                                                                            


                                                                                                                              
 

                                                    
 
                
                                                                                                                                            

                                      
                     





                                                                                                                         
                                                                                    

                                                                        



                                                                                                                                                  
                                                                            
                                         
                                                                    
                                         
                 

                                      


































                                                                                                                                          
         
 









                                                                                                                                        

                                                                               
                                          

                                                                                          



                                                                                                            







                                                                                                                                            
                                     
                                                                                                                                          
                                                                     
                                                                                                                 

                                                                                                                                     





                                                                                                                                                             
                         
                 
                                
         
 

                                                                                                                
         
                                                                                                         






                                                                                        

         




                                                                             



                                                       
                                                                      
         













                                                                                         
         
 



                                                       
                                                                                           
         
                                                                                                       












                                                                                   
         




                                                            
                                                                                     






                                                                       


















































                                                                                                                                                     
 
package org.openslx.imagemaster.serverconnection;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;

import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
import org.openslx.imagemaster.Globals;
import org.openslx.imagemaster.db.mappers.DbImage;
import org.openslx.thrifthelper.ImagePublishDataEx;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;

/**
 * Class to handle all incoming and outgoing connections.
 * Also handles the authentication and the saving/delivering of images.
 */
public class ConnectionHandler implements IncomingEvent
{

	private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class );

	private static final int MAX_TRANSFERS = 12;

	private static Map<String, IncomingTransfer> incomingTransfersByTransferId = new ConcurrentHashMap<>();
	private static final Map<String, IncomingTransfer> incomingTransfersByVersionId = new ConcurrentHashMap<>();

	private static Map<String, OutgoingTransfer> outgoingTransfers = new ConcurrentHashMap<>();
	private static IncomingEvent eventHandler = new ConnectionHandler();
	private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
			new SynchronousQueue<Runnable>(),
			new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) );

	private static final Listener plainListener;
	private static final Listener sslListener;

	static {
		LOGGER.debug( "Starting BFTP on port " + Globals.getFiletransferPortSsl() + "+ and " + Globals.getFiletransferPortPlain() );
		Listener ssl = null;
		Listener plain = null;
		try {
			String pathToKeyStore = Globals.getSslKeystoreFile();
			char[] passphrase = Globals.getSslKeystorePassword().toCharArray();
			KeyStore keystore = KeyStore.getInstance( "JKS" );
			keystore.load( new FileInputStream( pathToKeyStore ), passphrase );
			KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
			kmf.init( keystore, passphrase );
			SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" );
			KeyManager[] keyManagers = kmf.getKeyManagers();
			sslContext.init( keyManagers, null, null );
			ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
			ssl.start();
			plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
			plain.start();
			// TODO: Bail out/retry if failed, getters for ports
		} catch ( Exception e ) {
			LOGGER.error( "Initialization failed.", e );
			System.exit( 2 );
		}
		sslListener = ssl;
		plainListener = plain;
		QuickTimer.scheduleAtFixedDelay( new Task() {

			@Override
			public void fire()
			{
				long now = System.currentTimeMillis();
				for ( Iterator<IncomingTransfer> it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) {
					IncomingTransfer t = it.next();
					if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
						LOGGER.debug( "Removing transfer " + t.getId() );
						it.remove();
					}
				}
				for ( Iterator<IncomingTransfer> it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) {
					IncomingTransfer t = it.next();
					if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
						it.remove();
					}
				}
			}
		}, 10000, 301000 );
	}

	public static int getSslPort()
	{
		if ( sslListener.isRunning() )
			return sslListener.getPort();
		return 0;
	}

	public static int getPlainPort()
	{
		if ( plainListener.isRunning() )
			return plainListener.getPort();
		return 0;
	}

	/**
	 * Register new incoming transfer from a satellite server.
	 * 
	 * @param img the desired meta data for the new image/version, as supplied by the satellite
	 * @param blockHashes list of block hashes for the image
	 * @param existing OPTIONAL if the image version already exists on the server, this is given so
	 *           we can repair/complete the local file, if necessary
	 * @return The assigned transfer object
	 */
	public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes, ImagePublishDataEx existing )
			throws TTransferRejectedException, TInvocationException
	{
		IncomingTransfer transfer;
		synchronized ( incomingTransfersByTransferId ) {
			transfer = incomingTransfersByVersionId.get( img.imageVersionId );
			if ( transfer == null ) {
				if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
					throw new TTransferRejectedException( "Too many active transfers" );
				}
				File absDestination;
				if ( existing == null ) {
					absDestination = new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId );
				} else {
					absDestination = new File( Globals.getImageDir(), existing.exImagePath );
				}
				plainListener.start();
				sslListener.start();
				try {
					transfer = new IncomingTransfer( img, blockHashes, absDestination, getPlainPort(), getSslPort() );
				} catch ( FileNotFoundException e ) {
					LOGGER.warn( "Cannot init download to " + absDestination.toString(), e );
					throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
				}
				LOGGER.info( "New incoming upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" );
				incomingTransfersByTransferId.put( transfer.getId(), transfer );
				incomingTransfersByVersionId.put( img.imageVersionId, transfer );
			} else {
				LOGGER.info( "Another request for existing upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName
						+ ")" );
			}
		}
		return transfer;
	}

	public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
			throws TTransferRejectedException
	{
		IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId );
		if ( transfer == null )
			return null;
		if ( transfer.getFileSize() != imageData.fileSize )
			throw new TTransferRejectedException( "File size mismatch" );
		if ( !transfer.hashesEqual( crcSums ) )
			throw new TTransferRejectedException( "Block hashes mismatch" );
		return transfer;
	}

	public static IncomingTransfer getUploadByToken( String uploadToken )
	{
		return incomingTransfersByTransferId.get( uploadToken );
	}

	/**
	 * Server is uploading - client is downloading!
	 */
	@Override
	public void incomingDownloadRequest( final Uploader uploader )
	{
		OutgoingTransfer transfer = outgoingTransfers.get( uploader.getToken() );
		if ( transfer == null ) {
			LOGGER.debug( "Unknown download token received" );
			uploader.sendErrorCode( "Unknown download token." );
			uploader.cancel();
			return;
		}
		if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
			uploader.sendErrorCode( "Too many concurrent uploads." );
			uploader.cancel();
		}
		if ( !transfer.addConnection( uploader, transferPool ) ) {
			uploader.cancel();
		}
	}

	/**
	 * Server is downloading - client is uploading!
	 */
	@Override
	public void incomingUploadRequest( final Downloader downloader ) throws IOException
	{
		IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() );
		if ( transfer == null ) {
			downloader.sendErrorCode( "Unknown upload token." );
			downloader.cancel();
			return;
		}
		if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
			downloader.sendErrorCode( "Too many concurrent uploads." );
			downloader.cancel();
			return;
		}
		if ( !transfer.addConnection( downloader, transferPool ) ) {
			downloader.cancel();
		}
	}

	public static int getUploadConnectionCount()
	{
		final long now = System.currentTimeMillis();
		int active = 0;
		for ( IncomingTransfer t : incomingTransfersByTransferId.values() ) {
			if ( t.countsTowardsConnectionLimit( now ) ) {
				active += t.getActiveConnectionCount();
			}
		}
		return active;
	}

	public static int getDownloadConnectionCount()
	{
		final long now = System.currentTimeMillis();
		int active = 0;
		for ( OutgoingTransfer t : outgoingTransfers.values() ) {
			if ( t.countsTowardsConnectionLimit( now ) ) {
				active += t.getActiveConnectionCount();
			}
		}
		return active;
	}

	public static void removeUpload( IncomingTransfer transfer )
	{
		incomingTransfersByTransferId.remove( transfer.getId() );
		incomingTransfersByVersionId.remove( transfer.getImageVersionId() );
	}

	public static TransferInformation registerDownload( ImagePublishDataEx img ) throws TTransferRejectedException, TInvocationException
	{
		OutgoingTransfer transfer;
		File absSource;
		absSource = new File( Globals.getImageDir(), img.exImagePath );
		if ( !absSource.exists() ) {
			LOGGER.error( absSource.toString() + " missing!" );
			try {
				DbImage.markValid( img.imageVersionId, false );
			} catch ( SQLException e ) {
			}
			throw new TTransferRejectedException( "File missing on server" );
		}
		if ( absSource.length() != img.fileSize ) {
			LOGGER.error( absSource.toString() + " has unexpected size (is: " + absSource.length() + ", should: " + img.fileSize + ")" );
			try {
				DbImage.markValid( img.imageVersionId, false );
			} catch ( SQLException e ) {
			}
			throw new TTransferRejectedException( "File corrupted on server" );
		}
		synchronized ( outgoingTransfers ) {
			if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
				throw new TTransferRejectedException( "Too many active transfers" );
			}
			plainListener.start();
			sslListener.start();
			transfer = new OutgoingTransfer( absSource, getPlainPort(), getSslPort() );
			outgoingTransfers.put( transfer.getId(), transfer );
		}
		return transfer.getTransferInfo();
	}

}