package org.openslx.imagemaster.serverconnection; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import org.apache.commons.lang.mutable.MutableInt; import org.apache.log4j.Logger; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.imagemaster.Globals; import org.openslx.imagemaster.db.DbImage; /** * Class to handle all incoming and outgoing connections. * Also handles the authentication and the saving/delivering of images. */ public class ConnectionHandler implements IncomingEvent { private static Logger log = Logger.getLogger( ConnectionHandler.class ); private static SSLContext sslContext; private static Map pendingIncomingUploads = new ConcurrentHashMap<>(); private static Map pendingIncomingDownloads = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue() ); private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue() ); private static Listener listener; static { log.debug( "Starting listener on port " + Globals.getSslSocketPort() ); try { String pathToKeyStore = Globals.getSslKeystoreFile(); char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); KeyStore keystore = KeyStore.getInstance( "JKS" ); keystore.load( new FileInputStream( pathToKeyStore ), passphrase ); KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() ); kmf.init( keystore, passphrase ); sslContext = SSLContext.getInstance( "SSLv3" ); KeyManager[] keyManagers = kmf.getKeyManagers(); sslContext.init( keyManagers, null, null ); listener = new Listener( eventHandler, sslContext, Globals.getSslSocketPort() ); listener.start(); } catch ( FileNotFoundException e ) { log.error( "Could not find keystore." ); System.exit( 2 ); } catch ( KeyStoreException e ) { log.error( "KeyStore implemenation not supported." ); System.exit( 2 ); } catch ( NoSuchAlgorithmException e ) { log.error( "Could not find such Algorithm" ); System.exit( 2 ); } catch ( CertificateException e ) { log.error( "Certificate unvalid." ); System.exit( 2 ); } catch ( IOException e ) { log.error( "Could not read keyfile" ); System.exit( 2 ); } catch ( UnrecoverableKeyException e ) { log.error( "Key in keystore is not valid" ); System.exit( 2 ); } catch ( KeyManagementException e ) { log.error( "Context initialization failed." ); System.exit( 2 ); } } /** * Add a new allowed incoming upload connection * for the given token and image. * * @param token The unique token * @param image Image being uploaded */ public static void addUpload( String token, UploadingImage image ) { pendingIncomingUploads.put( token, image ); log.debug( "Added upload" ); } /** * Add a new allowed incoming download connection * for the given token and image. * * @param token The unique token * @param image Image being uploaded */ public static void addDownload( String token, DbImage image ) { pendingIncomingDownloads.put( token, image ); log.debug( "Added download" ); } /** * Server is uploading - client is downloading! */ @Override public void incomingUploader( final Uploader uploader ) { String token = uploader.getToken(); // check token to identify the client if ( token == null ) { uploader.sendErrorCode( "No token available." ); uploader.close( null ); return; } final DbImage image = pendingIncomingDownloads.remove( token ); if ( image == null ) { uploader.sendErrorCode( "Token not accepted." ); uploader.close( null ); return; } try { uploadPool.execute( new Runnable() { @Override public void run() { uploader.upload( image.getAbsolutePath() ); } } ); } catch ( RejectedExecutionException e ) { uploader.sendErrorCode( "Too many concurrent uploads." ); uploader.close( null ); } } /** * Server is downloading - client is uploading! */ @Override public void incomingDownloader( final Downloader downloader ) throws IOException { log.debug( "Client wants to upload" ); String token = downloader.getToken(); if ( token == null ) { downloader.sendErrorCode( "No token available." ); downloader.close( null ); return; } final UploadingImage image = pendingIncomingUploads.remove( token ); if ( image == null ) { downloader.sendErrorCode( "Token not accepted." ); downloader.close( null ); return; } final MutableInt lastBlock = new MutableInt( -1 ); try { downloadPool.execute( new Runnable() { @Override public void run() { downloader.download( image.getAbsolutePath(), new WantRangeCallback() { @Override public FileRange get() { if ( lastBlock.intValue() != -1 ) { image.setNeedsCheck( lastBlock.intValue() ); image.increaseTransmittedTimes( lastBlock.intValue() ); } // get start of range. int blockNumber = image.getNextMissingBlock(); if ( blockNumber == -1 ) { log.debug( "Download complete." ); return null; } lastBlock.setValue( blockNumber ); log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." ); long startOfRange = image.getNextMissingBlock() * Globals.blockSize; long endOfRange = Math.min( startOfRange + Globals.blockSize, image.getFileSize() ); FileRange range = new FileRange( startOfRange, endOfRange ); return range; } } ); image.updateDb(); } } ); } catch ( RejectedExecutionException e ) { downloader.sendErrorCode( "Too many concurrent downloads." ); downloader.close( null ); } } }