diff options
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java')
-rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java | 194 |
1 files changed, 84 insertions, 110 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 0fb52f5..e6319c9 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -1,6 +1,5 @@ package org.openslx.imagemaster.serverconnection; -import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -10,14 +9,18 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.log4j.Logger; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; @@ -26,6 +29,7 @@ import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.imagemaster.Globals; +import org.openslx.imagemaster.db.DbImage; /** * Class to handle all incoming and outgoing connections. @@ -37,8 +41,11 @@ public class ConnectionHandler implements IncomingEvent private static Logger log = Logger.getLogger( ConnectionHandler.class ); private static SSLContext sslContext; - private static Map<String, Connection> connections = new ConcurrentHashMap<>(); + private static Map<String, UploadingImage> pendingIncomingUploads = new ConcurrentHashMap<>(); + private static Map<String, DbImage> pendingIncomingDownloads = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); + private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); + private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); private static Listener listener; @@ -81,153 +88,120 @@ public class ConnectionHandler implements IncomingEvent } /** - * Add a new connection with a unique token. - * To up- or download the file in file path. + * Add a new allowed incoming upload connection + * for the given token and image. * * @param token The unique token - * @param filepath The file to up- or download - * @param type True if upload or false if download - * @return The created connection + * @param image Image being uploaded */ - public static Connection addConnection( String token, String filepath, boolean type ) + public static void addUpload( String token, UploadingImage image ) { - log.debug( "Added connection (" + ( ( type ) ? "uploading" : "downloading" ) + ") with token: '" + token + "'" ); - - Connection connection = new Connection( filepath, type ); - synchronized ( connections ) { - connections.put( token, connection ); - } - return connection; - } - - public static boolean hasConnection( String token ) - { - return connections.containsKey( token ); + pendingIncomingUploads.put( token, image ); + log.debug( "Added upload" ); } - public static void removeConnection( String token ) + /** + * Add a new allowed incoming download connection + * for the given token and image. + * + * @param token The unique token + * @param image Image being uploaded + */ + public static void addDownload( String token, DbImage image ) { - synchronized ( connections ) { - connections.remove( token ); // token is remove, so connections are rejected - } + pendingIncomingDownloads.put( token, image ); + log.debug( "Added download" ); } /** * Server is uploading - client is downloading! */ @Override - public void incomingUploader( Uploader uploader ) throws IOException + public void incomingUploader( final Uploader uploader ) { String token = uploader.getToken(); - log.debug( "Got token :'" + token + "'" ); // check token to identify the client - if (token == null) - { + if ( token == null ) { uploader.sendErrorCode( "No token available." ); - uploader.close(null); + uploader.close( null ); return; } - if ( !connections.containsKey( token ) ) { + + final DbImage image = pendingIncomingDownloads.remove( token ); + if ( image == null ) { uploader.sendErrorCode( "Token not accepted." ); - uploader.close(null); + uploader.close( null ); return; } - // check if he was a downloading client - if ( connections.get( token ).type == Connection.UPLOADING ) { - uploader.sendErrorCode( "You can not download, if you are uploading." ); - uploader.close(null); - return; + try { + uploadPool.execute( new Runnable() { + @Override + public void run() + { + uploader.upload( image.getAbsolutePath() ); + } + } ); + } catch ( RejectedExecutionException e ) { + uploader.sendErrorCode( "Too many concurrent uploads." ); + uploader.close( null ); } - - String fileName = connections.get( token ).filepath; - uploader.upload(fileName); } /** * Server is downloading - client is uploading! */ @Override - public void incomingDownloader( Downloader downloader ) throws IOException + public void incomingDownloader( final Downloader downloader ) throws IOException { log.debug( "Client wants to upload" ); String token = downloader.getToken(); - if (token == null) + if ( token == null ) { downloader.sendErrorCode( "No token available." ); - downloader.close(null); + downloader.close( null ); return; } - // Check token to identify the client. - if ( !connections.containsKey( token ) ) { + + final UploadingImage image = pendingIncomingUploads.remove( token ); + if ( image == null ) { downloader.sendErrorCode( "Token not accepted." ); - downloader.close(null); + downloader.close( null ); return; } - - // check if he was a uploading client - if ( connections.get( token ).type == Connection.DOWNLOADING ) { - downloader.sendErrorCode( "You can not upload, if you are downloading." ); - downloader.close(null); - return; + final MutableInt lastBlock = new MutableInt( -1 ); + + try { + downloadPool.execute( new Runnable() { + @Override + public void run() + { + downloader.download( image.getAbsolutePath(), new WantRangeCallback() { + @Override + public FileRange get() + { + if ( lastBlock.intValue() != -1 ) { + image.setNeedsCheck( lastBlock.intValue() ); + image.increaseTransmittedTimes( lastBlock.intValue() ); + } + // get start of range. + int blockNumber = image.getNextMissingBlock(); + if ( blockNumber == -1 ) + return null; + lastBlock.setValue( blockNumber ); + log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." ); + + long startOfRange = image.getNextMissingBlock() * Globals.blockSize; + long endOfRange = Math.min( startOfRange + Globals.blockSize, image.getFileSize() ); + FileRange range = new FileRange( startOfRange, endOfRange ); + return range; + } + } ); + } + } ); + } catch ( RejectedExecutionException e ) { + downloader.sendErrorCode( "Too many concurrent downloads." ); + downloader.close( null ); } - - String destinationFileName = connections.get( token ).filepath; - final UploadingImage image = connections.get( token ).image; - downloader.download( destinationFileName, new WantRangeCallback() { - - @Override - public FileRange get() { - // get start of range. - int blockNumber = image.getNextMissingBlock(); - if (blockNumber == -1) - return null; - - image.setNeedsCheck( blockNumber ); - image.increaseTransmittedTimes( blockNumber ); - log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." ); - - long startOfRange = image.getNextMissingBlock() * Globals.blockSize; - long endOfRange = Math.min(startOfRange + Globals.blockSize, image.getImageFile().length()); - FileRange range = new FileRange(startOfRange, endOfRange); - return range; - } - }); -// long startOfRange = 0; -// String token = ""; -// // try to read meta data -// while ( downloader.readMetaData() ) { -// // check token to identify the client -// token = downloader.getToken(); -// if ( !connections.containsKey( token ) ) { -// downloader.sendErrorCode( "Token not accepted." ); -// downloader.close(null); -// return; -// } -// -// startOfRange = downloader.getStartOfRange(); -// -// if ( downloader.getDiffOfRange() <= 0 ) { -// return; -// } -// -// // check if he was a uploading client -// if ( connections.get( token ).type == Connection.DOWNLOADING ) { -// downloader.sendErrorCode( "You can not upload, if you are downloading." ); -// downloader.close(null); -// return; -// } -// -// -// int blockNumber = (int) ( startOfRange / Globals.blockSize ); -// UploadingImage image = connections.get( token ).image; -// image.setNeedsCheck( blockNumber ); -// image.increaseTransmittedTimes( blockNumber ); -// log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." ); -// -// downloader.setOutputFilename( connections.get( token ).filepath ); -// downloader.receiveBinary(); -// } - downloader.close(null); } } |