summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/imagemaster/serverconnection
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection')
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java92
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java172
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java81
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java24
4 files changed, 185 insertions, 184 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java
deleted file mode 100644
index 3acac5b..0000000
--- a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.openslx.imagemaster.serverconnection;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-public abstract class AbstractTransfer {
-
- /**
- * How long to keep this transfer information when the transfer is
- * (potentially) done
- */
- private static final long FINISH_TIMEOUT = TimeUnit.MINUTES.toMillis(5);
-
- /**
- * How long to keep this transfer information when there are no active
- * connections and the transfer seems unfinished
- */
- private static final long IDLE_TIMEOUT = TimeUnit.HOURS.toMillis(4);
-
- /**
- * Time stamp of when (we think) the transfer finished. Clients can/might
- * not tell us they're done, and simply taking "no active connection" as a
- * sign the download is done might have unwanted effects if the user's
- * connection drops for a minute. If this time stamp (plus a FINISH_TIMEOUT)
- * passed,
- * we consider the download done and flag it for removal.
- * If set to zero, the transfer is not finished, or not assumed to have
- * finished.
- */
- protected final AtomicLong potentialFinishTime = new AtomicLong(0);
-
- /**
- * Time of last activity on this transfer.
- */
- protected final AtomicLong lastActivityTime = new AtomicLong(System.currentTimeMillis());
-
- private final String transferId;
-
- public AbstractTransfer(String transferId) {
- this.transferId = transferId;
- }
-
- /**
- * Returns true if the transfer is considered completed.
- *
- * @param now pass System.currentTimeMillis()
- * @return true if the transfer is considered completed
- */
- public boolean isComplete(long now) {
- long val = potentialFinishTime.get();
- return val != 0 && val + FINISH_TIMEOUT < now;
- }
-
- /**
- * Returns true if there has been no activity on this transfer for a certain
- * amount of time.
- *
- * @param now pass System.currentTimeMillis()
- * @return true if the transfer reached its idle timeout
- */
- public final boolean hasReachedIdleTimeout(long now) {
- return getActiveConnectionCount() == 0 && lastActivityTime.get() + IDLE_TIMEOUT < now;
- }
-
- public final String getId() {
- return transferId;
- }
-
- /**
- * Returns true if this transfer would potentially accept new connections.
- * This should NOT return false if there are too many concurrent
- * connections, as this is used to signal the client whether to keep trying
- * to connect.
- *
- * @return true if this transfer would potentially accept new connections
- */
- public abstract boolean isActive();
-
- /**
- * Cancel this transfer, aborting all active connections and rejecting
- * further incoming ones.
- */
- public abstract void cancel();
-
- /**
- * Returns number of active transfer connections.
- *
- * @return number of active transfer connections
- */
- public abstract int getActiveConnectionCount();
-
-}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
index 44c8e16..141e17f 100644
--- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
@@ -1,13 +1,15 @@
package org.openslx.imagemaster.serverconnection;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
@@ -19,17 +21,13 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
-import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
+import org.openslx.filetransfer.util.AbstractTransfer;
import org.openslx.imagemaster.Globals;
-import org.openslx.imagemaster.crcchecker.CrcFile;
-import org.openslx.imagemaster.db.mappers.DbOsVirt;
-import org.openslx.imagemaster.db.mappers.DbUser;
-import org.openslx.imagemaster.util.RandomString;
-import org.openslx.imagemaster.util.Util;
+import org.openslx.util.GrowingThreadPoolExecutor;
/**
* Class to handle all incoming and outgoing connections.
@@ -38,19 +36,24 @@ import org.openslx.imagemaster.util.Util;
public class ConnectionHandler implements IncomingEvent
{
- private static Logger log = Logger.getLogger( ConnectionHandler.class );
- private static SSLContext sslContext;
+ private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class );
- private static Map<String, AbstractTransfer> pendingIncomingUploads = new ConcurrentHashMap<>();
- private static Map<String, AbstractTransfer> pendingIncomingDownloads = new ConcurrentHashMap<>();
+ private static final int MAX_TRANSFERS = 12;
+
+ private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>();
+ private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
- private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
- private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+ private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(),
+ new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) );
- private static Listener listener;
+ private static final Listener plainListener;
+ private static final Listener sslListener;
static {
- log.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ Listener ssl = null;
+ Listener plain = null;
try {
String pathToKeyStore = Globals.getSslKeystoreFile();
char[] passphrase = Globals.getSslKeystorePassword().toCharArray();
@@ -58,94 +61,55 @@ public class ConnectionHandler implements IncomingEvent
keystore.load( new FileInputStream( pathToKeyStore ), passphrase );
KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
kmf.init( keystore, passphrase );
- sslContext = SSLContext.getInstance( "TLSv1.2" );
+ SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" );
KeyManager[] keyManagers = kmf.getKeyManagers();
sslContext.init( keyManagers, null, null );
- listener = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
- listener.start();
+ ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
+ ssl.start();
+ plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
+ plain.start();
} catch ( Exception e ) {
- log.error( "Initialization failed.", e );
+ LOGGER.error( "Initialization failed.", e );
System.exit( 2 );
}
+ sslListener = ssl;
+ plainListener = plain;
}
- /**
- * Checks if this image is already uploading and returns a new list with missing blocks if so.
- * Puts the new image into processing list else.
- *
- * @param serverSessionId The uploading server
- * @param imageData The data of the image
- * @return
- * @throws UploadException If some error occurred during the process
- */
- public static TransferInformation getUploadInfos( ImagePublishData imageData, List<Integer> crcSums )
+ public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes )
throws TTransferRejectedException, TInvocationException
{
- // check image data
- if ( Util.isEmpty( imageData.imageName ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" );
- if ( !DbUser.exists( imageData.user ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Invalid or missing image owner" );
- if ( DbOsVirt.osExists( imageData.osId ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" );
- if ( DbOsVirt.virtExists( imageData.virtId ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" );
- if ( imageData.fileSize <= 0 )
- throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" );
-
- log.debug( "A satellite is submitting " + imageData.imageVersionId );
-
- final String uuid = imageData.imageVersionId;
- final String filepathRelative;
- final CrcFile crcFile;
- if ( crcSums == null ) {
- crcFile = null;
- } else {
- crcFile = new CrcFile( crcSums );
- }
- ImagePublishData image;
-
- synchronized ( pendingIncomingUploads ) {
- /*
- // check if image is already uploading
- if ( ( image = uploadingImages.get( uuid ) ) == null ) {
- // TODO insert new image to DB
- uploadingImages.put( uuid, image );
+ IncomingTransfer transfer;
+ synchronized ( incomingTransfers ) {
+ transfer = incomingTransfers.get( img.imageVersionId );
+ if ( transfer == null ) {
+ if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
+ throw new TTransferRejectedException( "Too many active transfers" );
+ }
+ try {
+ transfer = new IncomingTransfer( img, blockHashes );
+ } catch ( FileNotFoundException e ) {
+ LOGGER.warn( "Cannot init download", e );
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
+ }
+ incomingTransfers.put( transfer.getId(), transfer );
+ incomingTransfers.put( img.imageVersionId, transfer );
}
- */
}
-
- final String token = RandomString.generate( 50, false );
-
- // TODO addUpload( token, image );
- // TODO Set crc file on image - if there is already a crc file assigned, this does nothing
- return new TransferInformation( token, Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() );
- }
-
- /**
- * Add a new allowed incoming upload connection
- * for the given token and image.
- *
- * @param token The unique token
- * @param image Image being uploaded
- */
- public static void addUpload( String token, AbstractTransfer image )
- {
- pendingIncomingUploads.put( token, image );
- log.debug( "Added upload" );
+ return transfer;
}
- /**
- * Add a new allowed incoming download connection
- * for the given token and image.
- *
- * @param token The unique token
- * @param image Image being uploaded
- */
- public static void addDownload( String token, AbstractTransfer image )
+ public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
+ throws TTransferRejectedException
{
- pendingIncomingDownloads.put( token, image );
- log.debug( "Added download" );
+ IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId );
+ if ( transfer == null )
+ return null;
+ if ( transfer.getFileSize() != imageData.fileSize )
+ throw new TTransferRejectedException( "File size mismatch" );
+ if ( !transfer.hashesEqual( crcSums ) )
+ throw new TTransferRejectedException( "Block hashes mismatch" );
+ return transfer;
}
/**
@@ -165,8 +129,32 @@ public class ConnectionHandler implements IncomingEvent
@Override
public void incomingUploadRequest( final Downloader downloader ) throws IOException
{
- // TODO
- downloader.sendErrorCode( "Too many concurrent downloads." );
- downloader.cancel();
+ IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() );
+ if ( transfer == null ) {
+ downloader.sendErrorCode( "Unknown upload token." );
+ downloader.cancel();
+ return;
+ }
+ if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
+ downloader.sendErrorCode( "Too many concurrent uploads." );
+ downloader.cancel();
+ return;
+ }
+ if ( !transfer.addConnection( downloader, transferPool ) ) {
+ downloader.cancel();
+ }
}
+
+ public static int getUploadConnectionCount()
+ {
+ final long now = System.currentTimeMillis();
+ int active = 0;
+ for ( IncomingTransfer t : incomingTransfers.values() ) {
+ if ( t.countsTowardsConnectionLimit( now ) ) {
+ active += t.getActiveConnectionCount();
+ }
+ }
+ return active;
+ }
+
}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java
new file mode 100644
index 0000000..bfc65e1
--- /dev/null
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java
@@ -0,0 +1,81 @@
+package org.openslx.imagemaster.serverconnection;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
+import org.openslx.bwlp.thrift.iface.TInvocationException;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+import org.openslx.filetransfer.util.ChunkStatus;
+import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.filetransfer.util.IncomingTransferBase;
+import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.mappers.DbImageBlock;
+import org.openslx.imagemaster.util.Util;
+import org.openslx.util.ThriftUtil;
+
+public class IncomingTransfer extends IncomingTransferBase
+{
+
+ private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * 10;
+
+ private final String imageVersionId;
+
+ public IncomingTransfer( ImagePublishData img, List<ByteBuffer> blockHashes )
+ throws TInvocationException, FileNotFoundException
+ {
+ super( UUID.randomUUID().toString(), new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId ),
+ img.fileSize, ThriftUtil.unwrapByteBufferList( blockHashes ) );
+ this.imageVersionId = img.imageVersionId;
+ }
+
+ @Override
+ public String getRelativePath()
+ {
+ return Util.getRelativePath( getTmpFileName(), new File( Globals.getImageDir() ) );
+ }
+
+ @Override
+ public synchronized void cancel()
+ {
+ super.cancel();
+ getTmpFileName().delete();
+ }
+
+ @Override
+ protected boolean hasEnoughFreeSpace()
+ {
+ long space = Globals.getImagePath().getUsableSpace();
+ return space > MIN_FREE_SPACE_BYTES;
+ }
+
+ @Override
+ protected boolean finishIncomingTransfer()
+ {
+ potentialFinishTime.set( System.currentTimeMillis() );
+ return true;
+ }
+
+ @Override
+ public TransferInformation getTransferInfo()
+ {
+ return new TransferInformation( getId(), Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() );
+ }
+
+ @Override
+ protected void chunkStatusChanged( FileChunk chunk )
+ {
+ ChunkStatus status = chunk.getStatus();
+ if ( status == ChunkStatus.MISSING || status == ChunkStatus.COMPLETE ) {
+ try {
+ DbImageBlock.asyncUpdate( imageVersionId, chunk );
+ } catch ( InterruptedException e ) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java
new file mode 100644
index 0000000..5fa9da4
--- /dev/null
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java
@@ -0,0 +1,24 @@
+package org.openslx.imagemaster.serverconnection;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PrioThreadFactory implements ThreadFactory {
+
+ private final AtomicInteger counter = new AtomicInteger();
+ private final String name;
+ private final int priority;
+
+ public PrioThreadFactory(String name, int priority) {
+ this.name = name;
+ this.priority = priority;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, name + "-" + counter.incrementAndGet());
+ thread.setPriority(priority);
+ return thread;
+ }
+
+}