package org.openslx.imagemaster.serverconnection;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import org.apache.log4j.Logger;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
import org.openslx.filetransfer.util.AbstractTransfer;
import org.openslx.imagemaster.Globals;
import org.openslx.util.GrowingThreadPoolExecutor;
/**
* Class to handle all incoming and outgoing connections.
* Also handles the authentication and the saving/delivering of images.
*/
public class ConnectionHandler implements IncomingEvent
{
private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class );
private static final int MAX_TRANSFERS = 12;
private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>();
private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) );
private static final Listener plainListener;
private static final Listener sslListener;
static {
LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
Listener ssl = null;
Listener plain = null;
try {
String pathToKeyStore = Globals.getSslKeystoreFile();
char[] passphrase = Globals.getSslKeystorePassword().toCharArray();
KeyStore keystore = KeyStore.getInstance( "JKS" );
keystore.load( new FileInputStream( pathToKeyStore ), passphrase );
KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
kmf.init( keystore, passphrase );
SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" );
KeyManager[] keyManagers = kmf.getKeyManagers();
sslContext.init( keyManagers, null, null );
ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
ssl.start();
plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
plain.start();
} catch ( Exception e ) {
LOGGER.error( "Initialization failed.", e );
System.exit( 2 );
}
sslListener = ssl;
plainListener = plain;
}
public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes )
throws TTransferRejectedException, TInvocationException
{
IncomingTransfer transfer;
synchronized ( incomingTransfers ) {
transfer = incomingTransfers.get( img.imageVersionId );
if ( transfer == null ) {
if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
throw new TTransferRejectedException( "Too many active transfers" );
}
try {
transfer = new IncomingTransfer( img, blockHashes );
} catch ( FileNotFoundException e ) {
LOGGER.warn( "Cannot init download", e );
throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
}
incomingTransfers.put( transfer.getId(), transfer );
incomingTransfers.put( img.imageVersionId, transfer );
}
}
return transfer;
}
public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
throws TTransferRejectedException
{
IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId );
if ( transfer == null )
return null;
if ( transfer.getFileSize() != imageData.fileSize )
throw new TTransferRejectedException( "File size mismatch" );
if ( !transfer.hashesEqual( crcSums ) )
throw new TTransferRejectedException( "Block hashes mismatch" );
return transfer;
}
/**
* Server is uploading - client is downloading!
*/
@Override
public void incomingDownloadRequest( final Uploader uploader )
{
// TODO
uploader.sendErrorCode( "Too many concurrent uploads." );
uploader.cancel();
}
/**
* Server is downloading - client is uploading!
*/
@Override
public void incomingUploadRequest( final Downloader downloader ) throws IOException
{
IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() );
if ( transfer == null ) {
downloader.sendErrorCode( "Unknown upload token." );
downloader.cancel();
return;
}
if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
downloader.sendErrorCode( "Too many concurrent uploads." );
downloader.cancel();
return;
}
if ( !transfer.addConnection( downloader, transferPool ) ) {
downloader.cancel();
}
}
public static int getUploadConnectionCount()
{
final long now = System.currentTimeMillis();
int active = 0;
for ( IncomingTransfer t : incomingTransfers.values() ) {
if ( t.countsTowardsConnectionLimit( now ) ) {
active += t.getActiveConnectionCount();
}
}
return active;
}
}