summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/imagemaster/serverconnection
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection')
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java167
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java45
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/OutgoingTransfer.java22
3 files changed, 213 insertions, 21 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
index 141e17f..f32c655 100644
--- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
@@ -1,10 +1,13 @@
package org.openslx.imagemaster.serverconnection;
+import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.KeyStore;
+import java.sql.SQLException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -21,13 +24,17 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
-import org.openslx.filetransfer.util.AbstractTransfer;
import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.mappers.DbImage;
+import org.openslx.thrifthelper.ImagePublishDataEx;
import org.openslx.util.GrowingThreadPoolExecutor;
+import org.openslx.util.QuickTimer;
+import org.openslx.util.QuickTimer.Task;
/**
* Class to handle all incoming and outgoing connections.
@@ -40,8 +47,10 @@ public class ConnectionHandler implements IncomingEvent
private static final int MAX_TRANSFERS = 12;
- private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>();
- private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>();
+ private static Map<String, IncomingTransfer> incomingTransfersByTransferId = new ConcurrentHashMap<>();
+ private static final Map<String, IncomingTransfer> incomingTransfersByVersionId = new ConcurrentHashMap<>();
+
+ private static Map<String, OutgoingTransfer> outgoingTransfers = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
@@ -51,7 +60,7 @@ public class ConnectionHandler implements IncomingEvent
private static final Listener sslListener;
static {
- LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ LOGGER.debug( "Starting BFTP on port " + Globals.getFiletransferPortSsl() + "+ and " + Globals.getFiletransferPortPlain() );
Listener ssl = null;
Listener plain = null;
try {
@@ -68,32 +77,89 @@ public class ConnectionHandler implements IncomingEvent
ssl.start();
plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
plain.start();
+ // TODO: Bail out/retry if failed, getters for ports
} catch ( Exception e ) {
LOGGER.error( "Initialization failed.", e );
System.exit( 2 );
}
sslListener = ssl;
plainListener = plain;
+ QuickTimer.scheduleAtFixedDelay( new Task() {
+
+ @Override
+ public void fire()
+ {
+ long now = System.currentTimeMillis();
+ for ( Iterator<IncomingTransfer> it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) {
+ IncomingTransfer t = it.next();
+ if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
+ LOGGER.debug( "Removing transfer " + t.getId() );
+ it.remove();
+ }
+ }
+ for ( Iterator<IncomingTransfer> it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) {
+ IncomingTransfer t = it.next();
+ if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
+ it.remove();
+ }
+ }
+ }
+ }, 10000, 301000 );
+ }
+
+ public static int getSslPort()
+ {
+ if ( sslListener.isRunning() )
+ return sslListener.getPort();
+ return 0;
+ }
+
+ public static int getPlainPort()
+ {
+ if ( plainListener.isRunning() )
+ return plainListener.getPort();
+ return 0;
}
- public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes )
+ /**
+ * Register new incoming transfer from a satellite server.
+ *
+ * @param img the desired meta data for the new image/version, as supplied by the satellite
+ * @param blockHashes list of block hashes for the image
+ * @param existing OPTIONAL if the image version already exists on the server, this is given so
+ * we can repair/complete the local file, if necessary
+ * @return The assigned transfer object
+ */
+ public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes, ImagePublishDataEx existing )
throws TTransferRejectedException, TInvocationException
{
IncomingTransfer transfer;
- synchronized ( incomingTransfers ) {
- transfer = incomingTransfers.get( img.imageVersionId );
+ synchronized ( incomingTransfersByTransferId ) {
+ transfer = incomingTransfersByVersionId.get( img.imageVersionId );
if ( transfer == null ) {
if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
throw new TTransferRejectedException( "Too many active transfers" );
}
+ File absDestination;
+ if ( existing == null ) {
+ absDestination = new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId );
+ } else {
+ absDestination = new File( Globals.getImageDir(), existing.exImagePath );
+ }
+ plainListener.start();
+ sslListener.start();
try {
- transfer = new IncomingTransfer( img, blockHashes );
+ transfer = new IncomingTransfer( img, blockHashes, absDestination, getPlainPort(), getSslPort() );
} catch ( FileNotFoundException e ) {
- LOGGER.warn( "Cannot init download", e );
+ LOGGER.warn( "Cannot init download to " + absDestination.toString(), e );
throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
}
- incomingTransfers.put( transfer.getId(), transfer );
- incomingTransfers.put( img.imageVersionId, transfer );
+ LOGGER.info( "New incoming upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" );
+ incomingTransfersByTransferId.put( transfer.getId(), transfer );
+ incomingTransfersByVersionId.put( img.imageVersionId, transfer );
+ } else {
+ LOGGER.info( "Another request for existing upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName
+ + ")" );
}
}
return transfer;
@@ -102,7 +168,7 @@ public class ConnectionHandler implements IncomingEvent
public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
throws TTransferRejectedException
{
- IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId );
+ IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId );
if ( transfer == null )
return null;
if ( transfer.getFileSize() != imageData.fileSize )
@@ -112,15 +178,31 @@ public class ConnectionHandler implements IncomingEvent
return transfer;
}
+ public static IncomingTransfer getUploadByToken( String uploadToken )
+ {
+ return incomingTransfersByTransferId.get( uploadToken );
+ }
+
/**
* Server is uploading - client is downloading!
*/
@Override
public void incomingDownloadRequest( final Uploader uploader )
{
- // TODO
- uploader.sendErrorCode( "Too many concurrent uploads." );
- uploader.cancel();
+ OutgoingTransfer transfer = outgoingTransfers.get( uploader.getToken() );
+ if ( transfer == null ) {
+ LOGGER.debug( "Unknown download token received" );
+ uploader.sendErrorCode( "Unknown download token." );
+ uploader.cancel();
+ return;
+ }
+ if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
+ uploader.sendErrorCode( "Too many concurrent uploads." );
+ uploader.cancel();
+ }
+ if ( !transfer.addConnection( uploader, transferPool ) ) {
+ uploader.cancel();
+ }
}
/**
@@ -129,7 +211,7 @@ public class ConnectionHandler implements IncomingEvent
@Override
public void incomingUploadRequest( final Downloader downloader ) throws IOException
{
- IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() );
+ IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() );
if ( transfer == null ) {
downloader.sendErrorCode( "Unknown upload token." );
downloader.cancel();
@@ -149,7 +231,7 @@ public class ConnectionHandler implements IncomingEvent
{
final long now = System.currentTimeMillis();
int active = 0;
- for ( IncomingTransfer t : incomingTransfers.values() ) {
+ for ( IncomingTransfer t : incomingTransfersByTransferId.values() ) {
if ( t.countsTowardsConnectionLimit( now ) ) {
active += t.getActiveConnectionCount();
}
@@ -157,4 +239,55 @@ public class ConnectionHandler implements IncomingEvent
return active;
}
+ public static int getDownloadConnectionCount()
+ {
+ final long now = System.currentTimeMillis();
+ int active = 0;
+ for ( OutgoingTransfer t : outgoingTransfers.values() ) {
+ if ( t.countsTowardsConnectionLimit( now ) ) {
+ active += t.getActiveConnectionCount();
+ }
+ }
+ return active;
+ }
+
+ public static void removeUpload( IncomingTransfer transfer )
+ {
+ incomingTransfersByTransferId.remove( transfer.getId() );
+ incomingTransfersByVersionId.remove( transfer.getImageVersionId() );
+ }
+
+ public static TransferInformation registerDownload( ImagePublishDataEx img ) throws TTransferRejectedException, TInvocationException
+ {
+ OutgoingTransfer transfer;
+ File absSource;
+ absSource = new File( Globals.getImageDir(), img.exImagePath );
+ if ( !absSource.exists() ) {
+ LOGGER.error( absSource.toString() + " missing!" );
+ try {
+ DbImage.markValid( img.imageVersionId, false );
+ } catch ( SQLException e ) {
+ }
+ throw new TTransferRejectedException( "File missing on server" );
+ }
+ if ( absSource.length() != img.fileSize ) {
+ LOGGER.error( absSource.toString() + " has unexpected size (is: " + absSource.length() + ", should: " + img.fileSize + ")" );
+ try {
+ DbImage.markValid( img.imageVersionId, false );
+ } catch ( SQLException e ) {
+ }
+ throw new TTransferRejectedException( "File corrupted on server" );
+ }
+ synchronized ( outgoingTransfers ) {
+ if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
+ throw new TTransferRejectedException( "Too many active transfers" );
+ }
+ plainListener.start();
+ sslListener.start();
+ transfer = new OutgoingTransfer( absSource, getPlainPort(), getSslPort() );
+ outgoingTransfers.put( transfer.getId(), transfer );
+ }
+ return transfer.getTransferInfo();
+ }
+
}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java
index bfc65e1..53842be 100644
--- a/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java
@@ -3,9 +3,11 @@ package org.openslx.imagemaster.serverconnection;
import java.io.File;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
+import java.sql.SQLException;
import java.util.List;
import java.util.UUID;
+import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
@@ -13,6 +15,7 @@ import org.openslx.filetransfer.util.ChunkStatus;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.filetransfer.util.IncomingTransferBase;
import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.mappers.DbImage;
import org.openslx.imagemaster.db.mappers.DbImageBlock;
import org.openslx.imagemaster.util.Util;
import org.openslx.util.ThriftUtil;
@@ -20,16 +23,33 @@ import org.openslx.util.ThriftUtil;
public class IncomingTransfer extends IncomingTransferBase
{
+ private static final Logger LOGGER = Logger.getLogger( IncomingTransfer.class );
+
private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * 10;
private final String imageVersionId;
- public IncomingTransfer( ImagePublishData img, List<ByteBuffer> blockHashes )
+ private final TransferInformation transferInfo;
+
+ public IncomingTransfer( ImagePublishData img, List<ByteBuffer> blockHashes, File absDestination, int plainPort, int sslPort )
throws TInvocationException, FileNotFoundException
{
- super( UUID.randomUUID().toString(), new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId ),
- img.fileSize, ThriftUtil.unwrapByteBufferList( blockHashes ) );
+ super( UUID.randomUUID().toString(), absDestination, img.fileSize, ThriftUtil.unwrapByteBufferList( blockHashes ) );
this.imageVersionId = img.imageVersionId;
+ this.transferInfo = new TransferInformation( getId(), plainPort, sslPort );
+ // If the file already exists, see if any chunks are already complete
+ if ( absDestination.exists() && absDestination.length() > 0 ) {
+ try {
+ List<Boolean> statusList = DbImageBlock.getMissingStatusList( img.imageVersionId );
+ if ( !statusList.isEmpty() ) {
+ getChunks().resumeFromStatusList( statusList, absDestination.length() );
+ for ( int i = 0; i < 3; ++i ) {
+ queueUnhashedChunk( false );
+ }
+ }
+ } catch ( SQLException e ) {
+ }
+ }
}
@Override
@@ -56,18 +76,30 @@ public class IncomingTransfer extends IncomingTransferBase
protected boolean finishIncomingTransfer()
{
potentialFinishTime.set( System.currentTimeMillis() );
+ try {
+ DbImage.markValid( this.imageVersionId, true );
+ } catch ( SQLException e ) {
+ // Nothing to do
+ }
return true;
}
@Override
public TransferInformation getTransferInfo()
{
- return new TransferInformation( getId(), Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() );
+ return transferInfo;
}
@Override
protected void chunkStatusChanged( FileChunk chunk )
{
+ if ( chunk.getFailCount() > 6 ) {
+ cancel();
+ LOGGER.warn( "Server is cancelling upload of Version " + imageVersionId
+ + ": Hash check for block " + chunk.getChunkIndex()
+ + " failed " + chunk.getFailCount()
+ + " times." );
+ }
ChunkStatus status = chunk.getStatus();
if ( status == ChunkStatus.MISSING || status == ChunkStatus.COMPLETE ) {
try {
@@ -78,4 +110,9 @@ public class IncomingTransfer extends IncomingTransferBase
}
}
+ public Object getImageVersionId()
+ {
+ return imageVersionId;
+ }
+
}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/OutgoingTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/OutgoingTransfer.java
new file mode 100644
index 0000000..a6f80b2
--- /dev/null
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/OutgoingTransfer.java
@@ -0,0 +1,22 @@
+package org.openslx.imagemaster.serverconnection;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.openslx.filetransfer.util.OutgoingTransferBase;
+
+public class OutgoingTransfer extends OutgoingTransferBase
+{
+
+ public OutgoingTransfer( File sourceFile, int plainPort, int sslPort )
+ {
+ super( UUID.randomUUID().toString(), sourceFile, plainPort, sslPort );
+ }
+
+ @Override
+ public String getRelativePath()
+ {
+ return null;
+ }
+
+}