summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java')
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java172
1 files changed, 80 insertions, 92 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
index 44c8e16..141e17f 100644
--- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
@@ -1,13 +1,15 @@
package org.openslx.imagemaster.serverconnection;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
@@ -19,17 +21,13 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
-import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
+import org.openslx.filetransfer.util.AbstractTransfer;
import org.openslx.imagemaster.Globals;
-import org.openslx.imagemaster.crcchecker.CrcFile;
-import org.openslx.imagemaster.db.mappers.DbOsVirt;
-import org.openslx.imagemaster.db.mappers.DbUser;
-import org.openslx.imagemaster.util.RandomString;
-import org.openslx.imagemaster.util.Util;
+import org.openslx.util.GrowingThreadPoolExecutor;
/**
* Class to handle all incoming and outgoing connections.
@@ -38,19 +36,24 @@ import org.openslx.imagemaster.util.Util;
public class ConnectionHandler implements IncomingEvent
{
- private static Logger log = Logger.getLogger( ConnectionHandler.class );
- private static SSLContext sslContext;
+ private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class );
- private static Map<String, AbstractTransfer> pendingIncomingUploads = new ConcurrentHashMap<>();
- private static Map<String, AbstractTransfer> pendingIncomingDownloads = new ConcurrentHashMap<>();
+ private static final int MAX_TRANSFERS = 12;
+
+ private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>();
+ private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
- private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
- private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+ private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(),
+ new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) );
- private static Listener listener;
+ private static final Listener plainListener;
+ private static final Listener sslListener;
static {
- log.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ Listener ssl = null;
+ Listener plain = null;
try {
String pathToKeyStore = Globals.getSslKeystoreFile();
char[] passphrase = Globals.getSslKeystorePassword().toCharArray();
@@ -58,94 +61,55 @@ public class ConnectionHandler implements IncomingEvent
keystore.load( new FileInputStream( pathToKeyStore ), passphrase );
KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
kmf.init( keystore, passphrase );
- sslContext = SSLContext.getInstance( "TLSv1.2" );
+ SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" );
KeyManager[] keyManagers = kmf.getKeyManagers();
sslContext.init( keyManagers, null, null );
- listener = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
- listener.start();
+ ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
+ ssl.start();
+ plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
+ plain.start();
} catch ( Exception e ) {
- log.error( "Initialization failed.", e );
+ LOGGER.error( "Initialization failed.", e );
System.exit( 2 );
}
+ sslListener = ssl;
+ plainListener = plain;
}
- /**
- * Checks if this image is already uploading and returns a new list with missing blocks if so.
- * Puts the new image into processing list else.
- *
- * @param serverSessionId The uploading server
- * @param imageData The data of the image
- * @return
- * @throws UploadException If some error occurred during the process
- */
- public static TransferInformation getUploadInfos( ImagePublishData imageData, List<Integer> crcSums )
+ public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes )
throws TTransferRejectedException, TInvocationException
{
- // check image data
- if ( Util.isEmpty( imageData.imageName ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" );
- if ( !DbUser.exists( imageData.user ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Invalid or missing image owner" );
- if ( DbOsVirt.osExists( imageData.osId ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" );
- if ( DbOsVirt.virtExists( imageData.virtId ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" );
- if ( imageData.fileSize <= 0 )
- throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" );
-
- log.debug( "A satellite is submitting " + imageData.imageVersionId );
-
- final String uuid = imageData.imageVersionId;
- final String filepathRelative;
- final CrcFile crcFile;
- if ( crcSums == null ) {
- crcFile = null;
- } else {
- crcFile = new CrcFile( crcSums );
- }
- ImagePublishData image;
-
- synchronized ( pendingIncomingUploads ) {
- /*
- // check if image is already uploading
- if ( ( image = uploadingImages.get( uuid ) ) == null ) {
- // TODO insert new image to DB
- uploadingImages.put( uuid, image );
+ IncomingTransfer transfer;
+ synchronized ( incomingTransfers ) {
+ transfer = incomingTransfers.get( img.imageVersionId );
+ if ( transfer == null ) {
+ if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
+ throw new TTransferRejectedException( "Too many active transfers" );
+ }
+ try {
+ transfer = new IncomingTransfer( img, blockHashes );
+ } catch ( FileNotFoundException e ) {
+ LOGGER.warn( "Cannot init download", e );
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
+ }
+ incomingTransfers.put( transfer.getId(), transfer );
+ incomingTransfers.put( img.imageVersionId, transfer );
}
- */
}
-
- final String token = RandomString.generate( 50, false );
-
- // TODO addUpload( token, image );
- // TODO Set crc file on image - if there is already a crc file assigned, this does nothing
- return new TransferInformation( token, Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() );
- }
-
- /**
- * Add a new allowed incoming upload connection
- * for the given token and image.
- *
- * @param token The unique token
- * @param image Image being uploaded
- */
- public static void addUpload( String token, AbstractTransfer image )
- {
- pendingIncomingUploads.put( token, image );
- log.debug( "Added upload" );
+ return transfer;
}
- /**
- * Add a new allowed incoming download connection
- * for the given token and image.
- *
- * @param token The unique token
- * @param image Image being uploaded
- */
- public static void addDownload( String token, AbstractTransfer image )
+ public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
+ throws TTransferRejectedException
{
- pendingIncomingDownloads.put( token, image );
- log.debug( "Added download" );
+ IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId );
+ if ( transfer == null )
+ return null;
+ if ( transfer.getFileSize() != imageData.fileSize )
+ throw new TTransferRejectedException( "File size mismatch" );
+ if ( !transfer.hashesEqual( crcSums ) )
+ throw new TTransferRejectedException( "Block hashes mismatch" );
+ return transfer;
}
/**
@@ -165,8 +129,32 @@ public class ConnectionHandler implements IncomingEvent
@Override
public void incomingUploadRequest( final Downloader downloader ) throws IOException
{
- // TODO
- downloader.sendErrorCode( "Too many concurrent downloads." );
- downloader.cancel();
+ IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() );
+ if ( transfer == null ) {
+ downloader.sendErrorCode( "Unknown upload token." );
+ downloader.cancel();
+ return;
+ }
+ if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
+ downloader.sendErrorCode( "Too many concurrent uploads." );
+ downloader.cancel();
+ return;
+ }
+ if ( !transfer.addConnection( downloader, transferPool ) ) {
+ downloader.cancel();
+ }
}
+
+ public static int getUploadConnectionCount()
+ {
+ final long now = System.currentTimeMillis();
+ int active = 0;
+ for ( IncomingTransfer t : incomingTransfers.values() ) {
+ if ( t.countsTowardsConnectionLimit( now ) ) {
+ active += t.getActiveConnectionCount();
+ }
+ }
+ return active;
+ }
+
}