diff options
| author | Nils Schwabe | 2014-07-04 12:50:59 +0200 |
|---|---|---|
| committer | Nils Schwabe | 2014-07-04 12:50:59 +0200 |
| commit | d1ab55e39b392e62ca1f1e41e3c70cf48d676885 (patch) | |
| tree | 2fcf410f3a7ff4a3acae9ad5502fc933f8887e2c /src/main/java/org/openslx/imagemaster/serverconnection | |
| parent | Fix that Globals are initialized before servers are starting up (diff) | |
| download | masterserver-d1ab55e39b392e62ca1f1e41e3c70cf48d676885.tar.gz masterserver-d1ab55e39b392e62ca1f1e41e3c70cf48d676885.tar.xz masterserver-d1ab55e39b392e62ca1f1e41e3c70cf48d676885.zip | |
Change handling of connections and add support for download
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection')
| -rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java | 45 | ||||
| -rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java | 66 | ||||
| -rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java | 74 | ||||
| -rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java | 88 | ||||
| -rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/UploadingImageInfos.java (renamed from src/main/java/org/openslx/imagemaster/serverconnection/ImageInfos.java) | 4 |
5 files changed, 256 insertions, 21 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java new file mode 100644 index 0000000..aeba3b9 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java @@ -0,0 +1,45 @@ +package org.openslx.imagemaster.serverconnection; + +import org.openslx.filetransfer.Listener; + +public class ConnectionData +{ + /** + * Where the file is stored locally. + */ + protected final String filepath; + + /** + * Type of this connection. + * True if uploading, false if downloading + */ + protected final boolean type; + public final static boolean UPLOADING = true; + public final static boolean DOWNLOADING = false; + + /** + * The active listener thread that listens for incoming connections. + */ + protected final Listener listenerThread; + + /** + * Create a new connection data + * @param filepath Where the file is stored locally + * @param type True if uploading, false if downloading + * @param listenerThread The active listener thread that listens for incoming connections + */ + protected ConnectionData(String filepath, boolean type, Listener listenerThread) + { + this.filepath = filepath; + this.type = type; + this.listenerThread = listenerThread; + } + + /** + * The port where this connection is running. + * @return The port where this connection is running. + */ + protected int getPort() { + return this.listenerThread.getPort(); + } +} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 498e058..1a2ff0b 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -1,5 +1,6 @@ package org.openslx.imagemaster.serverconnection; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -24,8 +25,6 @@ import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.imagemaster.Globals; -import org.openslx.imagemaster.db.DbImage; -import org.openslx.imagemaster.util.Tuple; /** * Class to handle all incoming and outgoing connections. @@ -38,7 +37,7 @@ public class ConnectionHandler implements IncomingEvent * Key: token, * Value: Tuple of the listener and the filepath. */ - private static Map<String, Tuple<Thread, String>> activeListeners = new HashMap<>(); + private static Map<String, ConnectionData> activeListeners = new HashMap<>(); private static List<Integer> possiblePorts = new LinkedList<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); @@ -68,29 +67,43 @@ public class ConnectionHandler implements IncomingEvent sslContext.init(keyManagers, null, null); } catch (FileNotFoundException e) { log.error( "Could not find keystore." ); + System.exit( 2 ); } catch ( KeyStoreException e ) { log.error( "KeyStore implemenation not supported." ); + System.exit( 2 ); } catch ( NoSuchAlgorithmException e ) { log.error( "Could not find such Algorithm" ); + System.exit( 2 ); } catch ( CertificateException e ) { log.error( "Certificate unvalid." ); + System.exit( 2 ); } catch ( IOException e ) { log.error( "Could not read keyfile" ); + System.exit( 2 ); } catch ( UnrecoverableKeyException e ) { log.error( "Key in keystore is not valid" ); + System.exit( 2 ); } catch ( KeyManagementException e ) { log.error( "Context initialization failed." ); + System.exit( 2 ); } } - public static void addConnection(String token, String filepath) + /** + * Add a new connection with a unique token. + * Tp up- or download the file in filepath. + * @param token The unique token + * @param filepath The file to up- or download + * @param type True if upload or false if download + */ + public static void addConnection(String token, String filepath, boolean type) { int port = possiblePorts.remove( 0 ); Listener listener = new Listener( eventHandler, sslContext, port ); listener.start(); - activeListeners.put( token, new Tuple<Thread, String>(listener, filepath) ); + activeListeners.put( token, new ConnectionData(filepath, type, listener) ); } public static boolean hasConnection( String token ) @@ -100,7 +113,7 @@ public class ConnectionHandler implements IncomingEvent public static void removeConnection( String token ) { - Listener l = (Listener)activeListeners.remove( token ).x; + Listener l = activeListeners.remove( token ).listenerThread; l.interrupt(); possiblePorts.add(l.getPort()); // add port back to possible's list } @@ -111,7 +124,29 @@ public class ConnectionHandler implements IncomingEvent @Override public void incomingUploader( Uploader uploader ) throws IOException { - // TODO: Handle incoming uploads (client download requests) + // try to read meta data + while ( uploader.readMetaData() ) { + String token = uploader.getToken(); + // check token to identify the client + if ( !activeListeners.containsKey( token )) { + uploader.sendErrorCode( "Token not accepted." ); + uploader.close(); + return; + } + + // check if he was a downloading client + if ( activeListeners.get( token ).type == ConnectionData.UPLOADING ) { + uploader.sendErrorCode( "You can not download, if you are uploading." ); + uploader.close(); + return; + } + // TODO: check which range needs to be sent and send this range + long length = ( new File( activeListeners.get( token ).filepath ) ).length(); + + uploader.sendRange(0, (int)length); + uploader.sendFile( activeListeners.get( token ).filepath ); + } + uploader.close(); } /** @@ -121,15 +156,24 @@ public class ConnectionHandler implements IncomingEvent public void incomingDownloader( Downloader downloader ) throws IOException { // try to read meta data - while (downloader.readMetaData()) { + while ( downloader.readMetaData() ) { // check token to identify the client String token = downloader.getToken(); - if (!activeListeners.containsKey( token )) { + if ( !activeListeners.containsKey( token ) ) { + downloader.sendErrorCode( "Token not accepted." ); + downloader.close(); + return; + } + // check if he was a uploading client + if ( activeListeners.get( token ).type == ConnectionData.DOWNLOADING ) { + downloader.sendErrorCode( "You can not upload, if you are downloading." ); + downloader.close(); return; } - downloader.setOutputFilename( activeListeners.get( token ).y ); + + downloader.setOutputFilename( activeListeners.get( token ).filepath ); downloader.readBinary(); - downloader.close(); } + downloader.close(); } } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java b/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java new file mode 100644 index 0000000..a403c84 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java @@ -0,0 +1,74 @@ +package org.openslx.imagemaster.serverconnection; + +import java.util.HashMap; +import java.util.List; + +/** + * Helper class for the ImageProcessor to know some things about the downloading client + * + */ +public class DownloadingClientInfos +{ + + public final HashMap<String, ImageInfos> downloadingImages = new HashMap<>(); + + DownloadingClientInfos() + { + } + + public void addDownload( String uuid, List<Integer> list, String token ) + { + downloadingImages.put( uuid, new ImageInfos( uuid, list, token ) ); + } + + public void removeDownload( String uuid ) + { + downloadingImages.remove( uuid ); + } + + public boolean isDownloading( String uuid ) + { + return downloadingImages.containsKey( uuid ); + } + + public boolean hasDownloads() + { + return (downloadingImages.size() > 0); + } + + public List<Integer> getLastRequestedBlocks( String uuid ) + { + if ( !downloadingImages.containsKey( uuid ) ) + return null; + return downloadingImages.get( uuid ).lastRequestedBlocks; + } + + public void requestBlocks( String uuid, List<Integer> list ) + { + if ( !downloadingImages.containsKey( uuid ) ) + return; + downloadingImages.get( uuid ).lastRequestedBlocks = list; + } + + public String getToken( String uuid ) + { + if ( !downloadingImages.containsKey( uuid ) ) + return null; + return downloadingImages.get( uuid ).token; + } + + class ImageInfos + { + + public final String uuid; + public final String token; + private List<Integer> lastRequestedBlocks; + + ImageInfos(String uuid, List<Integer> list, String token) + { + this.uuid = uuid; + this.lastRequestedBlocks = list; + this.token = token; + } + } +} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java b/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java index 71ef97b..db3702d 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java @@ -8,10 +8,15 @@ import java.util.List; import org.apache.log4j.Logger; import org.openslx.imagemaster.Globals; import org.openslx.imagemaster.db.DbImage; +import org.openslx.imagemaster.thrift.iface.DownloadInfos; import org.openslx.imagemaster.thrift.iface.ImageData; import org.openslx.imagemaster.thrift.iface.UploadInfos; import org.openslx.imagemaster.util.RandomString; +/** + * Processing the up- and download of images. + * Handles who is authorized and knows which blocks are missing / need to be sent + */ public class ImageProcessor { @@ -24,9 +29,16 @@ public class ImageProcessor /** * The uploading images. * Key: imageUUID, - * Value: imageInfos + * Value: uploadingImageInfos */ - private static HashMap<String, ImageInfos> uploadingImages = new HashMap<>(); + private static HashMap<String, UploadingImageInfos> uploadingImages = new HashMap<>(); + + /** + * The downloading clients. + * Key: serverSessionId + * Value: downloadingClientInfos + */ + private static HashMap<String, DownloadingClientInfos> downloadingClients = new HashMap<>(); /** * Checks if this image is already uploading and returns a new list with missing blocks if so. @@ -49,13 +61,13 @@ public class ImageProcessor if ( missing.isEmpty() ) { String token = uploadingImages.get( uuid ).getToken(); uploadDone( uuid ); - return new UploadInfos( token, missing ); + return new UploadInfos( null, missing ); } uploadingImages.get( uuid ).setLastSentBlocks( missing ); return new UploadInfos( uploadingImages.get( uuid ).getToken(), missing ); } - // insert new image and generate token + // insert new image and start listener synchronized ( uploadingImages ) { int nBlocks = (int)Math.ceil( imageData.fileSize / Globals.blockSize ); List<Integer> allBlocks = new LinkedList<>(); @@ -64,11 +76,11 @@ public class ImageProcessor } String token = RandomString.generate( 100, false ); String filepath = Globals.getImageDir() + "/" + uuid + ".vmdk"; - ConnectionHandler.addConnection( token, filepath ); + ConnectionHandler.addConnection( token, filepath, ConnectionData.UPLOADING ); // TODO: proper synchronization, interface is multi threaded. // should synchronize operations on the map (use concurrent map) and then synchronize on the uploading image // when handing the missing blocks etc... - uploadingImages.put( uuid, new ImageInfos( token, allBlocks, serverSessionId, new Timestamp( System.currentTimeMillis() ) ) ); + uploadingImages.put( uuid, new UploadingImageInfos( token, allBlocks, serverSessionId, new Timestamp( System.currentTimeMillis() ) ) ); DbImage.insert( imageData, System.currentTimeMillis(), token, allBlocks, serverSessionId, filepath ); List<Integer> missing = getMissingBlocks( uuid, AMOUNT ); @@ -81,6 +93,62 @@ public class ImageProcessor } } + public static DownloadInfos getDownloadInfos( String serverSessionId, String uuid, List<Integer> requestedBlocks ) + { + // check if server is already downloading + if ( downloadingClients.containsKey( serverSessionId ) ) { + DownloadingClientInfos client = downloadingClients.get( serverSessionId ); + + // remove download if done + if ( requestedBlocks.isEmpty() ) + { + downloadDone( serverSessionId, uuid ); + return new DownloadInfos(); + } + + if ( client.isDownloading( uuid ) ) + { + // client was downloading this image + // update the requested blocks + client.requestBlocks( uuid, requestedBlocks ); + return new DownloadInfos( client.getToken( uuid ) ); + } + + // server was downloading another image and now gets a new connection for this new download + String token = RandomString.generate( 100, false ); + String filepath = DbImage.getImageByUUID( uuid ).imagePath; + client.addDownload( uuid, requestedBlocks, token ); + + downloadingClients.put( serverSessionId, client ); + ConnectionHandler.addConnection( token, filepath, ConnectionData.DOWNLOADING ); + return new DownloadInfos( token ); + } + + // insert new client and start listener + synchronized ( downloadingClients ) { + DownloadingClientInfos client = new DownloadingClientInfos(); + String token = RandomString.generate( 100, false ); + String filepath = DbImage.getImageByUUID( uuid ).imagePath; + client.addDownload( uuid, requestedBlocks, token ); + + downloadingClients.put( serverSessionId, client ); + ConnectionHandler.addConnection( token, filepath, ConnectionData.DOWNLOADING ); + return new DownloadInfos( token ); + } + } + + private static void downloadDone( String serverSessionId, String uuid ) + { + synchronized ( downloadingClients ) { + DownloadingClientInfos client = downloadingClients.get( serverSessionId ); + client.removeDownload( uuid ); + ConnectionHandler.removeConnection( client.getToken( uuid ) ); + if ( !client.hasDownloads() ) { + downloadingClients.remove( serverSessionId ); + } + } + } + /** * Returns a specified number of missing blocks. * @@ -128,10 +196,14 @@ public class ImageProcessor List<DbImage> list = DbImage.getUploadingImages(); for ( DbImage image : list ) { String token = image.token; - ConnectionHandler.addConnection( token, image.imagePath ); - ImageInfos infos = new ImageInfos( token, image.missingBlocks, image.serverSessionId, image.timestamp ); + ConnectionHandler.addConnection( token, image.imagePath, ConnectionData.UPLOADING ); + UploadingImageInfos infos = new UploadingImageInfos( token, image.missingBlocks, image.serverSessionId, image.timestamp ); uploadingImages.put( image.UUID, infos ); } log.info( "Added " + list.size() + " pending upload(s) to process list again." ); } + + public static void init() + { + } } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ImageInfos.java b/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImageInfos.java index 1e84978..17debbe 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ImageInfos.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImageInfos.java @@ -8,7 +8,7 @@ import java.util.List; /** * Helper class for ImageProcessor to save some infos about the images in the process list. */ -public class ImageInfos +public class UploadingImageInfos { /** * Token for the satellite. @@ -26,7 +26,7 @@ public class ImageInfos private String serverSessionId; private Timestamp ts; // when did the server something for the last time - protected ImageInfos(String token, List<Integer> missingBlocks, String serverSessionId, Timestamp ts) + protected UploadingImageInfos(String token, List<Integer> missingBlocks, String serverSessionId, Timestamp ts) { this.token = token; this.missingBlocks = missingBlocks; |
