summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java')
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java167
1 files changed, 150 insertions, 17 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
index 141e17f..f32c655 100644
--- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
@@ -1,10 +1,13 @@
package org.openslx.imagemaster.serverconnection;
+import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.KeyStore;
+import java.sql.SQLException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -21,13 +24,17 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
-import org.openslx.filetransfer.util.AbstractTransfer;
import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.mappers.DbImage;
+import org.openslx.thrifthelper.ImagePublishDataEx;
import org.openslx.util.GrowingThreadPoolExecutor;
+import org.openslx.util.QuickTimer;
+import org.openslx.util.QuickTimer.Task;
/**
* Class to handle all incoming and outgoing connections.
@@ -40,8 +47,10 @@ public class ConnectionHandler implements IncomingEvent
private static final int MAX_TRANSFERS = 12;
- private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>();
- private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>();
+ private static Map<String, IncomingTransfer> incomingTransfersByTransferId = new ConcurrentHashMap<>();
+ private static final Map<String, IncomingTransfer> incomingTransfersByVersionId = new ConcurrentHashMap<>();
+
+ private static Map<String, OutgoingTransfer> outgoingTransfers = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
@@ -51,7 +60,7 @@ public class ConnectionHandler implements IncomingEvent
private static final Listener sslListener;
static {
- LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ LOGGER.debug( "Starting BFTP on port " + Globals.getFiletransferPortSsl() + "+ and " + Globals.getFiletransferPortPlain() );
Listener ssl = null;
Listener plain = null;
try {
@@ -68,32 +77,89 @@ public class ConnectionHandler implements IncomingEvent
ssl.start();
plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
plain.start();
+ // TODO: Bail out/retry if failed, getters for ports
} catch ( Exception e ) {
LOGGER.error( "Initialization failed.", e );
System.exit( 2 );
}
sslListener = ssl;
plainListener = plain;
+ QuickTimer.scheduleAtFixedDelay( new Task() {
+
+ @Override
+ public void fire()
+ {
+ long now = System.currentTimeMillis();
+ for ( Iterator<IncomingTransfer> it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) {
+ IncomingTransfer t = it.next();
+ if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
+ LOGGER.debug( "Removing transfer " + t.getId() );
+ it.remove();
+ }
+ }
+ for ( Iterator<IncomingTransfer> it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) {
+ IncomingTransfer t = it.next();
+ if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
+ it.remove();
+ }
+ }
+ }
+ }, 10000, 301000 );
+ }
+
+ public static int getSslPort()
+ {
+ if ( sslListener.isRunning() )
+ return sslListener.getPort();
+ return 0;
+ }
+
+ public static int getPlainPort()
+ {
+ if ( plainListener.isRunning() )
+ return plainListener.getPort();
+ return 0;
}
- public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes )
+ /**
+ * Register new incoming transfer from a satellite server.
+ *
+ * @param img the desired meta data for the new image/version, as supplied by the satellite
+ * @param blockHashes list of block hashes for the image
+ * @param existing OPTIONAL if the image version already exists on the server, this is given so
+ * we can repair/complete the local file, if necessary
+ * @return The assigned transfer object
+ */
+ public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes, ImagePublishDataEx existing )
throws TTransferRejectedException, TInvocationException
{
IncomingTransfer transfer;
- synchronized ( incomingTransfers ) {
- transfer = incomingTransfers.get( img.imageVersionId );
+ synchronized ( incomingTransfersByTransferId ) {
+ transfer = incomingTransfersByVersionId.get( img.imageVersionId );
if ( transfer == null ) {
if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
throw new TTransferRejectedException( "Too many active transfers" );
}
+ File absDestination;
+ if ( existing == null ) {
+ absDestination = new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId );
+ } else {
+ absDestination = new File( Globals.getImageDir(), existing.exImagePath );
+ }
+ plainListener.start();
+ sslListener.start();
try {
- transfer = new IncomingTransfer( img, blockHashes );
+ transfer = new IncomingTransfer( img, blockHashes, absDestination, getPlainPort(), getSslPort() );
} catch ( FileNotFoundException e ) {
- LOGGER.warn( "Cannot init download", e );
+ LOGGER.warn( "Cannot init download to " + absDestination.toString(), e );
throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
}
- incomingTransfers.put( transfer.getId(), transfer );
- incomingTransfers.put( img.imageVersionId, transfer );
+ LOGGER.info( "New incoming upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" );
+ incomingTransfersByTransferId.put( transfer.getId(), transfer );
+ incomingTransfersByVersionId.put( img.imageVersionId, transfer );
+ } else {
+ LOGGER.info( "Another request for existing upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName
+ + ")" );
}
}
return transfer;
@@ -102,7 +168,7 @@ public class ConnectionHandler implements IncomingEvent
public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
throws TTransferRejectedException
{
- IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId );
+ IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId );
if ( transfer == null )
return null;
if ( transfer.getFileSize() != imageData.fileSize )
@@ -112,15 +178,31 @@ public class ConnectionHandler implements IncomingEvent
return transfer;
}
+ public static IncomingTransfer getUploadByToken( String uploadToken )
+ {
+ return incomingTransfersByTransferId.get( uploadToken );
+ }
+
/**
* Server is uploading - client is downloading!
*/
@Override
public void incomingDownloadRequest( final Uploader uploader )
{
- // TODO
- uploader.sendErrorCode( "Too many concurrent uploads." );
- uploader.cancel();
+ OutgoingTransfer transfer = outgoingTransfers.get( uploader.getToken() );
+ if ( transfer == null ) {
+ LOGGER.debug( "Unknown download token received" );
+ uploader.sendErrorCode( "Unknown download token." );
+ uploader.cancel();
+ return;
+ }
+ if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
+ uploader.sendErrorCode( "Too many concurrent uploads." );
+ uploader.cancel();
+ }
+ if ( !transfer.addConnection( uploader, transferPool ) ) {
+ uploader.cancel();
+ }
}
/**
@@ -129,7 +211,7 @@ public class ConnectionHandler implements IncomingEvent
@Override
public void incomingUploadRequest( final Downloader downloader ) throws IOException
{
- IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() );
+ IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() );
if ( transfer == null ) {
downloader.sendErrorCode( "Unknown upload token." );
downloader.cancel();
@@ -149,7 +231,7 @@ public class ConnectionHandler implements IncomingEvent
{
final long now = System.currentTimeMillis();
int active = 0;
- for ( IncomingTransfer t : incomingTransfers.values() ) {
+ for ( IncomingTransfer t : incomingTransfersByTransferId.values() ) {
if ( t.countsTowardsConnectionLimit( now ) ) {
active += t.getActiveConnectionCount();
}
@@ -157,4 +239,55 @@ public class ConnectionHandler implements IncomingEvent
return active;
}
+ public static int getDownloadConnectionCount()
+ {
+ final long now = System.currentTimeMillis();
+ int active = 0;
+ for ( OutgoingTransfer t : outgoingTransfers.values() ) {
+ if ( t.countsTowardsConnectionLimit( now ) ) {
+ active += t.getActiveConnectionCount();
+ }
+ }
+ return active;
+ }
+
+ public static void removeUpload( IncomingTransfer transfer )
+ {
+ incomingTransfersByTransferId.remove( transfer.getId() );
+ incomingTransfersByVersionId.remove( transfer.getImageVersionId() );
+ }
+
+ public static TransferInformation registerDownload( ImagePublishDataEx img ) throws TTransferRejectedException, TInvocationException
+ {
+ OutgoingTransfer transfer;
+ File absSource;
+ absSource = new File( Globals.getImageDir(), img.exImagePath );
+ if ( !absSource.exists() ) {
+ LOGGER.error( absSource.toString() + " missing!" );
+ try {
+ DbImage.markValid( img.imageVersionId, false );
+ } catch ( SQLException e ) {
+ }
+ throw new TTransferRejectedException( "File missing on server" );
+ }
+ if ( absSource.length() != img.fileSize ) {
+ LOGGER.error( absSource.toString() + " has unexpected size (is: " + absSource.length() + ", should: " + img.fileSize + ")" );
+ try {
+ DbImage.markValid( img.imageVersionId, false );
+ } catch ( SQLException e ) {
+ }
+ throw new TTransferRejectedException( "File corrupted on server" );
+ }
+ synchronized ( outgoingTransfers ) {
+ if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
+ throw new TTransferRejectedException( "Too many active transfers" );
+ }
+ plainListener.start();
+ sslListener.start();
+ transfer = new OutgoingTransfer( absSource, getPlainPort(), getSslPort() );
+ outgoingTransfers.put( transfer.getId(), transfer );
+ }
+ return transfer.getTransferInfo();
+ }
+
}