diff options
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java')
-rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java | 172 |
1 files changed, 80 insertions, 92 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 44c8e16..141e17f 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -1,13 +1,15 @@ package org.openslx.imagemaster.serverconnection; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.KeyStore; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; @@ -19,17 +21,13 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; -import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; +import org.openslx.filetransfer.util.AbstractTransfer; import org.openslx.imagemaster.Globals; -import org.openslx.imagemaster.crcchecker.CrcFile; -import org.openslx.imagemaster.db.mappers.DbOsVirt; -import org.openslx.imagemaster.db.mappers.DbUser; -import org.openslx.imagemaster.util.RandomString; -import org.openslx.imagemaster.util.Util; +import org.openslx.util.GrowingThreadPoolExecutor; /** * Class to handle all incoming and outgoing connections. @@ -38,19 +36,24 @@ import org.openslx.imagemaster.util.Util; public class ConnectionHandler implements IncomingEvent { - private static Logger log = Logger.getLogger( ConnectionHandler.class ); - private static SSLContext sslContext; + private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class ); - private static Map<String, AbstractTransfer> pendingIncomingUploads = new ConcurrentHashMap<>(); - private static Map<String, AbstractTransfer> pendingIncomingDownloads = new ConcurrentHashMap<>(); + private static final int MAX_TRANSFERS = 12; + + private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>(); + private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); - private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); - private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); + private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>(), + new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) ); - private static Listener listener; + private static final Listener plainListener; + private static final Listener sslListener; static { - log.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); + LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); + Listener ssl = null; + Listener plain = null; try { String pathToKeyStore = Globals.getSslKeystoreFile(); char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); @@ -58,94 +61,55 @@ public class ConnectionHandler implements IncomingEvent keystore.load( new FileInputStream( pathToKeyStore ), passphrase ); KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() ); kmf.init( keystore, passphrase ); - sslContext = SSLContext.getInstance( "TLSv1.2" ); + SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" ); KeyManager[] keyManagers = kmf.getKeyManagers(); sslContext.init( keyManagers, null, null ); - listener = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); - listener.start(); + ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); + ssl.start(); + plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 ); + plain.start(); } catch ( Exception e ) { - log.error( "Initialization failed.", e ); + LOGGER.error( "Initialization failed.", e ); System.exit( 2 ); } + sslListener = ssl; + plainListener = plain; } - /** - * Checks if this image is already uploading and returns a new list with missing blocks if so. - * Puts the new image into processing list else. - * - * @param serverSessionId The uploading server - * @param imageData The data of the image - * @return - * @throws UploadException If some error occurred during the process - */ - public static TransferInformation getUploadInfos( ImagePublishData imageData, List<Integer> crcSums ) + public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes ) throws TTransferRejectedException, TInvocationException { - // check image data - if ( Util.isEmpty( imageData.imageName ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" ); - if ( !DbUser.exists( imageData.user ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Invalid or missing image owner" ); - if ( DbOsVirt.osExists( imageData.osId ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" ); - if ( DbOsVirt.virtExists( imageData.virtId ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" ); - if ( imageData.fileSize <= 0 ) - throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" ); - - log.debug( "A satellite is submitting " + imageData.imageVersionId ); - - final String uuid = imageData.imageVersionId; - final String filepathRelative; - final CrcFile crcFile; - if ( crcSums == null ) { - crcFile = null; - } else { - crcFile = new CrcFile( crcSums ); - } - ImagePublishData image; - - synchronized ( pendingIncomingUploads ) { - /* - // check if image is already uploading - if ( ( image = uploadingImages.get( uuid ) ) == null ) { - // TODO insert new image to DB - uploadingImages.put( uuid, image ); + IncomingTransfer transfer; + synchronized ( incomingTransfers ) { + transfer = incomingTransfers.get( img.imageVersionId ); + if ( transfer == null ) { + if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { + throw new TTransferRejectedException( "Too many active transfers" ); + } + try { + transfer = new IncomingTransfer( img, blockHashes ); + } catch ( FileNotFoundException e ) { + LOGGER.warn( "Cannot init download", e ); + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" ); + } + incomingTransfers.put( transfer.getId(), transfer ); + incomingTransfers.put( img.imageVersionId, transfer ); } - */ } - - final String token = RandomString.generate( 50, false ); - - // TODO addUpload( token, image ); - // TODO Set crc file on image - if there is already a crc file assigned, this does nothing - return new TransferInformation( token, Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() ); - } - - /** - * Add a new allowed incoming upload connection - * for the given token and image. - * - * @param token The unique token - * @param image Image being uploaded - */ - public static void addUpload( String token, AbstractTransfer image ) - { - pendingIncomingUploads.put( token, image ); - log.debug( "Added upload" ); + return transfer; } - /** - * Add a new allowed incoming download connection - * for the given token and image. - * - * @param token The unique token - * @param image Image being uploaded - */ - public static void addDownload( String token, AbstractTransfer image ) + public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums ) + throws TTransferRejectedException { - pendingIncomingDownloads.put( token, image ); - log.debug( "Added download" ); + IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId ); + if ( transfer == null ) + return null; + if ( transfer.getFileSize() != imageData.fileSize ) + throw new TTransferRejectedException( "File size mismatch" ); + if ( !transfer.hashesEqual( crcSums ) ) + throw new TTransferRejectedException( "Block hashes mismatch" ); + return transfer; } /** @@ -165,8 +129,32 @@ public class ConnectionHandler implements IncomingEvent @Override public void incomingUploadRequest( final Downloader downloader ) throws IOException { - // TODO - downloader.sendErrorCode( "Too many concurrent downloads." ); - downloader.cancel(); + IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() ); + if ( transfer == null ) { + downloader.sendErrorCode( "Unknown upload token." ); + downloader.cancel(); + return; + } + if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { + downloader.sendErrorCode( "Too many concurrent uploads." ); + downloader.cancel(); + return; + } + if ( !transfer.addConnection( downloader, transferPool ) ) { + downloader.cancel(); + } } + + public static int getUploadConnectionCount() + { + final long now = System.currentTimeMillis(); + int active = 0; + for ( IncomingTransfer t : incomingTransfers.values() ) { + if ( t.countsTowardsConnectionLimit( now ) ) { + active += t.getActiveConnectionCount(); + } + } + return active; + } + } |