summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java')
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java194
1 files changed, 84 insertions, 110 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
index 0fb52f5..e6319c9 100644
--- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
@@ -1,6 +1,5 @@
package org.openslx.imagemaster.serverconnection;
-import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -10,14 +9,18 @@ import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.log4j.Logger;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
@@ -26,6 +29,7 @@ import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.DbImage;
/**
* Class to handle all incoming and outgoing connections.
@@ -37,8 +41,11 @@ public class ConnectionHandler implements IncomingEvent
private static Logger log = Logger.getLogger( ConnectionHandler.class );
private static SSLContext sslContext;
- private static Map<String, Connection> connections = new ConcurrentHashMap<>();
+ private static Map<String, UploadingImage> pendingIncomingUploads = new ConcurrentHashMap<>();
+ private static Map<String, DbImage> pendingIncomingDownloads = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
+ private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+ private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
private static Listener listener;
@@ -81,153 +88,120 @@ public class ConnectionHandler implements IncomingEvent
}
/**
- * Add a new connection with a unique token.
- * To up- or download the file in file path.
+ * Add a new allowed incoming upload connection
+ * for the given token and image.
*
* @param token The unique token
- * @param filepath The file to up- or download
- * @param type True if upload or false if download
- * @return The created connection
+ * @param image Image being uploaded
*/
- public static Connection addConnection( String token, String filepath, boolean type )
+ public static void addUpload( String token, UploadingImage image )
{
- log.debug( "Added connection (" + ( ( type ) ? "uploading" : "downloading" ) + ") with token: '" + token + "'" );
-
- Connection connection = new Connection( filepath, type );
- synchronized ( connections ) {
- connections.put( token, connection );
- }
- return connection;
- }
-
- public static boolean hasConnection( String token )
- {
- return connections.containsKey( token );
+ pendingIncomingUploads.put( token, image );
+ log.debug( "Added upload" );
}
- public static void removeConnection( String token )
+ /**
+ * Add a new allowed incoming download connection
+ * for the given token and image.
+ *
+ * @param token The unique token
+ * @param image Image being uploaded
+ */
+ public static void addDownload( String token, DbImage image )
{
- synchronized ( connections ) {
- connections.remove( token ); // token is remove, so connections are rejected
- }
+ pendingIncomingDownloads.put( token, image );
+ log.debug( "Added download" );
}
/**
* Server is uploading - client is downloading!
*/
@Override
- public void incomingUploader( Uploader uploader ) throws IOException
+ public void incomingUploader( final Uploader uploader )
{
String token = uploader.getToken();
- log.debug( "Got token :'" + token + "'" );
// check token to identify the client
- if (token == null)
- {
+ if ( token == null ) {
uploader.sendErrorCode( "No token available." );
- uploader.close(null);
+ uploader.close( null );
return;
}
- if ( !connections.containsKey( token ) ) {
+
+ final DbImage image = pendingIncomingDownloads.remove( token );
+ if ( image == null ) {
uploader.sendErrorCode( "Token not accepted." );
- uploader.close(null);
+ uploader.close( null );
return;
}
- // check if he was a downloading client
- if ( connections.get( token ).type == Connection.UPLOADING ) {
- uploader.sendErrorCode( "You can not download, if you are uploading." );
- uploader.close(null);
- return;
+ try {
+ uploadPool.execute( new Runnable() {
+ @Override
+ public void run()
+ {
+ uploader.upload( image.getAbsolutePath() );
+ }
+ } );
+ } catch ( RejectedExecutionException e ) {
+ uploader.sendErrorCode( "Too many concurrent uploads." );
+ uploader.close( null );
}
-
- String fileName = connections.get( token ).filepath;
- uploader.upload(fileName);
}
/**
* Server is downloading - client is uploading!
*/
@Override
- public void incomingDownloader( Downloader downloader ) throws IOException
+ public void incomingDownloader( final Downloader downloader ) throws IOException
{
log.debug( "Client wants to upload" );
String token = downloader.getToken();
- if (token == null)
+ if ( token == null )
{
downloader.sendErrorCode( "No token available." );
- downloader.close(null);
+ downloader.close( null );
return;
}
- // Check token to identify the client.
- if ( !connections.containsKey( token ) ) {
+
+ final UploadingImage image = pendingIncomingUploads.remove( token );
+ if ( image == null ) {
downloader.sendErrorCode( "Token not accepted." );
- downloader.close(null);
+ downloader.close( null );
return;
}
-
- // check if he was a uploading client
- if ( connections.get( token ).type == Connection.DOWNLOADING ) {
- downloader.sendErrorCode( "You can not upload, if you are downloading." );
- downloader.close(null);
- return;
+ final MutableInt lastBlock = new MutableInt( -1 );
+
+ try {
+ downloadPool.execute( new Runnable() {
+ @Override
+ public void run()
+ {
+ downloader.download( image.getAbsolutePath(), new WantRangeCallback() {
+ @Override
+ public FileRange get()
+ {
+ if ( lastBlock.intValue() != -1 ) {
+ image.setNeedsCheck( lastBlock.intValue() );
+ image.increaseTransmittedTimes( lastBlock.intValue() );
+ }
+ // get start of range.
+ int blockNumber = image.getNextMissingBlock();
+ if ( blockNumber == -1 )
+ return null;
+ lastBlock.setValue( blockNumber );
+ log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." );
+
+ long startOfRange = image.getNextMissingBlock() * Globals.blockSize;
+ long endOfRange = Math.min( startOfRange + Globals.blockSize, image.getFileSize() );
+ FileRange range = new FileRange( startOfRange, endOfRange );
+ return range;
+ }
+ } );
+ }
+ } );
+ } catch ( RejectedExecutionException e ) {
+ downloader.sendErrorCode( "Too many concurrent downloads." );
+ downloader.close( null );
}
-
- String destinationFileName = connections.get( token ).filepath;
- final UploadingImage image = connections.get( token ).image;
- downloader.download( destinationFileName, new WantRangeCallback() {
-
- @Override
- public FileRange get() {
- // get start of range.
- int blockNumber = image.getNextMissingBlock();
- if (blockNumber == -1)
- return null;
-
- image.setNeedsCheck( blockNumber );
- image.increaseTransmittedTimes( blockNumber );
- log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." );
-
- long startOfRange = image.getNextMissingBlock() * Globals.blockSize;
- long endOfRange = Math.min(startOfRange + Globals.blockSize, image.getImageFile().length());
- FileRange range = new FileRange(startOfRange, endOfRange);
- return range;
- }
- });
-// long startOfRange = 0;
-// String token = "";
-// // try to read meta data
-// while ( downloader.readMetaData() ) {
-// // check token to identify the client
-// token = downloader.getToken();
-// if ( !connections.containsKey( token ) ) {
-// downloader.sendErrorCode( "Token not accepted." );
-// downloader.close(null);
-// return;
-// }
-//
-// startOfRange = downloader.getStartOfRange();
-//
-// if ( downloader.getDiffOfRange() <= 0 ) {
-// return;
-// }
-//
-// // check if he was a uploading client
-// if ( connections.get( token ).type == Connection.DOWNLOADING ) {
-// downloader.sendErrorCode( "You can not upload, if you are downloading." );
-// downloader.close(null);
-// return;
-// }
-//
-//
-// int blockNumber = (int) ( startOfRange / Globals.blockSize );
-// UploadingImage image = connections.get( token ).image;
-// image.setNeedsCheck( blockNumber );
-// image.increaseTransmittedTimes( blockNumber );
-// log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." );
-//
-// downloader.setOutputFilename( connections.get( token ).filepath );
-// downloader.receiveBinary();
-// }
- downloader.close(null);
}
}