package org.openslx.imagemaster.serverconnection;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
import org.openslx.imagemaster.Globals;
import org.openslx.imagemaster.db.mappers.DbImage;
import org.openslx.thrifthelper.ImagePublishDataEx;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
/**
* Class to handle all incoming and outgoing connections.
* Also handles the authentication and the saving/delivering of images.
*/
public class ConnectionHandler implements IncomingEvent
{
private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class );
private static final int MAX_TRANSFERS = 12;
private static Map<String, IncomingTransfer> incomingTransfersByTransferId = new ConcurrentHashMap<>();
private static final Map<String, IncomingTransfer> incomingTransfersByVersionId = new ConcurrentHashMap<>();
private static Map<String, OutgoingTransfer> outgoingTransfers = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) );
private static final Listener plainListener;
private static final Listener sslListener;
static {
LOGGER.debug( "Starting BFTP on port " + Globals.getFiletransferPortSsl() + "+ and " + Globals.getFiletransferPortPlain() );
Listener ssl = null;
Listener plain = null;
try {
String pathToKeyStore = Globals.getSslKeystoreFile();
char[] passphrase = Globals.getSslKeystorePassword().toCharArray();
KeyStore keystore = KeyStore.getInstance( "JKS" );
keystore.load( new FileInputStream( pathToKeyStore ), passphrase );
KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
kmf.init( keystore, passphrase );
SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" );
KeyManager[] keyManagers = kmf.getKeyManagers();
sslContext.init( keyManagers, null, null );
ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
ssl.start();
plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
plain.start();
// TODO: Bail out/retry if failed, getters for ports
} catch ( Exception e ) {
LOGGER.error( "Initialization failed.", e );
System.exit( 2 );
}
sslListener = ssl;
plainListener = plain;
QuickTimer.scheduleAtFixedDelay( new Task() {
@Override
public void fire()
{
long now = System.currentTimeMillis();
for ( Iterator<IncomingTransfer> it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) {
IncomingTransfer t = it.next();
if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
LOGGER.debug( "Removing transfer " + t.getId() );
it.remove();
}
}
for ( Iterator<IncomingTransfer> it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) {
IncomingTransfer t = it.next();
if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
it.remove();
}
}
}
}, 10000, 301000 );
}
public static int getSslPort()
{
if ( sslListener.isRunning() )
return sslListener.getPort();
return 0;
}
public static int getPlainPort()
{
if ( plainListener.isRunning() )
return plainListener.getPort();
return 0;
}
/**
* Register new incoming transfer from a satellite server.
*
* @param img the desired meta data for the new image/version, as supplied by the satellite
* @param blockHashes list of block hashes for the image
* @param existing OPTIONAL if the image version already exists on the server, this is given so
* we can repair/complete the local file, if necessary
* @return The assigned transfer object
*/
public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes, ImagePublishDataEx existing )
throws TTransferRejectedException, TInvocationException
{
IncomingTransfer transfer;
synchronized ( incomingTransfersByTransferId ) {
transfer = incomingTransfersByVersionId.get( img.imageVersionId );
if ( transfer == null ) {
if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
throw new TTransferRejectedException( "Too many active transfers" );
}
File absDestination;
if ( existing == null ) {
absDestination = new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId );
} else {
absDestination = new File( Globals.getImageDir(), existing.exImagePath );
}
plainListener.start();
sslListener.start();
try {
transfer = new IncomingTransfer( img, blockHashes, absDestination, getPlainPort(), getSslPort() );
} catch ( FileNotFoundException e ) {
LOGGER.warn( "Cannot init download to " + absDestination.toString(), e );
throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
}
LOGGER.info( "New incoming upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" );
incomingTransfersByTransferId.put( transfer.getId(), transfer );
incomingTransfersByVersionId.put( img.imageVersionId, transfer );
} else {
LOGGER.info( "Another request for existing upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName
+ ")" );
}
}
return transfer;
}
public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
throws TTransferRejectedException
{
IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId );
if ( transfer == null )
return null;
if ( transfer.getFileSize() != imageData.fileSize )
throw new TTransferRejectedException( "File size mismatch" );
if ( !transfer.hashesEqual( crcSums ) )
throw new TTransferRejectedException( "Block hashes mismatch" );
return transfer;
}
public static IncomingTransfer getUploadByToken( String uploadToken )
{
return incomingTransfersByTransferId.get( uploadToken );
}
/**
* Server is uploading - client is downloading!
*/
@Override
public void incomingDownloadRequest( final Uploader uploader )
{
OutgoingTransfer transfer = outgoingTransfers.get( uploader.getToken() );
if ( transfer == null ) {
LOGGER.debug( "Unknown download token received" );
uploader.sendErrorCode( "Unknown download token." );
uploader.cancel();
return;
}
if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
uploader.sendErrorCode( "Too many concurrent uploads." );
uploader.cancel();
}
if ( !transfer.addConnection( uploader, transferPool ) ) {
uploader.cancel();
}
}
/**
* Server is downloading - client is uploading!
*/
@Override
public void incomingUploadRequest( final Downloader downloader ) throws IOException
{
IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() );
if ( transfer == null ) {
downloader.sendErrorCode( "Unknown upload token." );
downloader.cancel();
return;
}
if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
downloader.sendErrorCode( "Too many concurrent uploads." );
downloader.cancel();
return;
}
if ( !transfer.addConnection( downloader, transferPool ) ) {
downloader.cancel();
}
}
public static int getUploadConnectionCount()
{
final long now = System.currentTimeMillis();
int active = 0;
for ( IncomingTransfer t : incomingTransfersByTransferId.values() ) {
if ( t.countsTowardsConnectionLimit( now ) ) {
active += t.getActiveConnectionCount();
}
}
return active;
}
public static int getDownloadConnectionCount()
{
final long now = System.currentTimeMillis();
int active = 0;
for ( OutgoingTransfer t : outgoingTransfers.values() ) {
if ( t.countsTowardsConnectionLimit( now ) ) {
active += t.getActiveConnectionCount();
}
}
return active;
}
public static void removeUpload( IncomingTransfer transfer )
{
incomingTransfersByTransferId.remove( transfer.getId() );
incomingTransfersByVersionId.remove( transfer.getImageVersionId() );
}
public static TransferInformation registerDownload( ImagePublishDataEx img ) throws TTransferRejectedException, TInvocationException
{
OutgoingTransfer transfer;
File absSource;
absSource = new File( Globals.getImageDir(), img.exImagePath );
if ( !absSource.exists() ) {
LOGGER.error( absSource.toString() + " missing!" );
try {
DbImage.markValid( img.imageVersionId, false );
} catch ( SQLException e ) {
}
throw new TTransferRejectedException( "File missing on server" );
}
if ( absSource.length() != img.fileSize ) {
LOGGER.error( absSource.toString() + " has unexpected size (is: " + absSource.length() + ", should: " + img.fileSize + ")" );
try {
DbImage.markValid( img.imageVersionId, false );
} catch ( SQLException e ) {
}
throw new TTransferRejectedException( "File corrupted on server" );
}
synchronized ( outgoingTransfers ) {
if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
throw new TTransferRejectedException( "Too many active transfers" );
}
plainListener.start();
sslListener.start();
transfer = new OutgoingTransfer( absSource, getPlainPort(), getSslPort() );
outgoingTransfers.put( transfer.getId(), transfer );
}
return transfer.getTransferInfo();
}
}