From c89abbfa830876b7298eb96896a642bc74589651 Mon Sep 17 00:00:00 2001 From: Nils Schwabe Date: Thu, 10 Jul 2014 16:08:36 +0200 Subject: Add some better thread synchonization Restructure some classes Fix small connection issues --- src/main/java/org/openslx/imagemaster/Globals.java | 115 +++++++++------- .../org/openslx/imagemaster/db/DbSatellite.java | 13 +- .../org/openslx/imagemaster/server/ApiServer.java | 109 --------------- .../imagemaster/serverconnection/CRCScheduler.java | 6 +- .../imagemaster/serverconnection/Connection.java | 30 +++++ .../serverconnection/ConnectionData.java | 45 ------- .../serverconnection/ConnectionHandler.java | 138 ++++++++++--------- .../serverconnection/DownloadingClient.java | 75 +++++++++++ .../serverconnection/DownloadingClientInfos.java | 82 ----------- .../serverconnection/ImageProcessor.java | 150 +++++++++++---------- .../serverconnection/UploadingImage.java | 114 ++++++++++++++++ .../serverconnection/UploadingImageInfos.java | 115 ---------------- .../org/openslx/imagemaster/util/ByteArray.java | 13 ++ 13 files changed, 454 insertions(+), 551 deletions(-) create mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/Connection.java delete mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java create mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClient.java delete mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java create mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java delete mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/UploadingImageInfos.java create mode 100644 src/main/java/org/openslx/imagemaster/util/ByteArray.java (limited to 'src') diff --git a/src/main/java/org/openslx/imagemaster/Globals.java b/src/main/java/org/openslx/imagemaster/Globals.java index 3675d96..b6848bb 100644 --- a/src/main/java/org/openslx/imagemaster/Globals.java +++ b/src/main/java/org/openslx/imagemaster/Globals.java @@ -15,11 +15,14 @@ public class Globals private static Logger log = Logger.getLogger( Globals.class ); private static final Properties properties = new Properties(); public final static int blockSize = 16 * 1024 * 1024; - - public static void init() {} + + public static void init() + { + } /** * Loads the properties from config/global.properties + * * @throws IOException */ static @@ -29,7 +32,7 @@ public class Globals BufferedInputStream stream = new BufferedInputStream( new FileInputStream( "config/global.properties" ) ); properties.load( stream ); stream.close(); - + // check properties Util.notNullOrEmptyFatal( getImageDir(), "Image directory must be set." ); Util.notNullOrEmptyFatal( getLdapHost(), "Ldap host must be set." ); @@ -41,13 +44,13 @@ public class Globals Util.notNullOrEmptyFatal( getSslKeystoreFile(), "SSL keystore file must be set." ); Util.notNullOrEmptyFatal( getSslKeystoreAlias(), "SSL keystore alias must be set." ); Util.notNullOrEmptyFatal( getSslKeystorePassword(), "SSL keystore password must be set." ); - + Util.notNullFatal( getLdapPort(), "Ldap port must be set." ); Util.notNullFatal( getSessionTimeoutUser(), "Session timeout user must be set." ); Util.notNullFatal( getSessionTimeoutServer(), "Session timeout server must be set." ); - Util.notNullFatal( getSslPort(), "SSL socket port must be set." ); + Util.notNullFatal( getSslSocketPort(), "SSL socket port must be set." ); Util.notNullFatal( getSslTimeout(), "SSL socket timeout must be set." ); - + // check ldap_bind_query if ( StringUtils.countMatches( getLdapBindQuery(), "%" ) == 0 ) { log.fatal( "ldap_bind_query does not contain '%'" ); @@ -55,13 +58,13 @@ public class Globals } // check ldap_search_filter - if ( StringUtils.countMatches( getLdapSearchFilter(), "%" ) == 0) { + if ( StringUtils.countMatches( getLdapSearchFilter(), "%" ) == 0 ) { log.fatal( "ldap_search_filter does not contain '%'" ); System.exit( 2 ); } - + // check keystore - if ( !getSslKeystoreFile().endsWith( ".jks" )) { + if ( !getSslKeystoreFile().endsWith( ".jks" ) ) { log.fatal( "Keystore is not in jks format." ); System.exit( 2 ); } @@ -71,82 +74,98 @@ public class Globals if ( image.endsWith( "/" ) ) { Globals.properties.put( "image_dir", image.substring( 0, image.length() - 1 ) ); } - - } catch (IOException e) { + + } catch ( IOException e ) { log.fatal( "Could not load properties!" ); log.warn( e.getStackTrace().toString() ); System.exit( 2 ); } log.info( "Loaded properties successfully" ); } - + /* INTEGERS */ - - public static int getLdapPort() { + + public static int getLdapPort() + { return Util.tryToParseInt( properties.getProperty( "ldap_port" ) ); } - - public static int getSessionTimeoutUser() { + + public static int getSessionTimeoutUser() + { return Util.tryToParseInt( properties.getProperty( "session_timeout_user" ) ); } - - public static int getSessionTimeoutServer() { + + public static int getSessionTimeoutServer() + { return Util.tryToParseInt( properties.getProperty( "session_timeout_user" ) ); } - - public static int getSslPort() { - return Util.tryToParseInt( properties.getProperty( "ssl_port" ) ); + + public static int getSslSocketPort() + { + return Util.tryToParseInt( properties.getProperty( "ssl_socket_port" ) ); } - - public static int getSslTimeout() { - return Util.tryToParseInt( properties.getProperty( "ssl_timeout" ) ); + + public static int getSslTimeout() + { + return Util.tryToParseInt( properties.getProperty( "ssl_socket_timeout" ) ); } - + /* STRINGS */ - - public static String getImageDir() { + + public static String getImageDir() + { return properties.getProperty( "image_dir" ); } - - public static String getSslKeystoreFile() { + + public static String getSslKeystoreFile() + { return properties.getProperty( "ssl_keystore_file" ); } - - public static String getSslKeystoreAlias() { + + public static String getSslKeystoreAlias() + { return properties.getProperty( "ssl_keystore_alias" ); } - - public static String getSslKeystorePassword() { + + public static String getSslKeystorePassword() + { return properties.getProperty( "ssl_keystore_password" ); } - - public static String getLdapHost() { + + public static String getLdapHost() + { return properties.getProperty( "ldap_host" ); } - - public static String getLdapBindQuery() { + + public static String getLdapBindQuery() + { return properties.getProperty( "ldap_bind_query" ); } - - public static String getLdapSearchBaseDn() { + + public static String getLdapSearchBaseDn() + { return properties.getProperty( "ldap_search_base_dn" ); } - - public static String getLdapSearchFilter() { + + public static String getLdapSearchFilter() + { return properties.getProperty( "ldap_search_filter" ); } - - public static String getLdapKeystorePassword() { + + public static String getLdapKeystorePassword() + { return properties.getProperty( "ldap_keystore_password" ); } - - public static String getLdapKeystorePath() { + + public static String getLdapKeystorePath() + { return properties.getProperty( "ldap_keystore_path" ); } - + /* BOOLEANS */ - - public static boolean getLdapSsl() { + + public static boolean getLdapSsl() + { return Boolean.valueOf( properties.getProperty( "ldap_ssl" ) ); } } diff --git a/src/main/java/org/openslx/imagemaster/db/DbSatellite.java b/src/main/java/org/openslx/imagemaster/db/DbSatellite.java index ed88642..f8bf036 100644 --- a/src/main/java/org/openslx/imagemaster/db/DbSatellite.java +++ b/src/main/java/org/openslx/imagemaster/db/DbSatellite.java @@ -1,5 +1,7 @@ package org.openslx.imagemaster.db; +import org.openslx.imagemaster.util.ByteArray; + public class DbSatellite { @@ -56,15 +58,4 @@ public class DbSatellite { return MySQL.findUniqueOrNull( ByteArray.class, "SELECT publickey FROM satellite WHERE organization = ?", organization ).array; } - - class ByteArray - { - - public final byte[] array; - - ByteArray(byte[] array) - { - this.array = array; - } - } } diff --git a/src/main/java/org/openslx/imagemaster/server/ApiServer.java b/src/main/java/org/openslx/imagemaster/server/ApiServer.java index 237789d..d4ccb08 100644 --- a/src/main/java/org/openslx/imagemaster/server/ApiServer.java +++ b/src/main/java/org/openslx/imagemaster/server/ApiServer.java @@ -83,40 +83,6 @@ public class ApiServer return new UserInfo( session.getUserId(), session.getFirstName(), session.getLastName(), session.getEMail() ); } -// TODO: Remove old code that's not needed anymore - that's what a vcs is for... -// /** -// * Request ftp credentials to upload a new image to the masterserver. -// * -// * @param imageDescription MetaData of the new image -// * @param serverSessionData the session data of the authenticated uni/hs server -// * @return the genereated ftp credentials -// * @throws AuthorizationException if the uni/hs server has no valid session -// * @throws TException -// */ -// public static FtpCredentials submitImage( String serverSessionId, -// ImageData imageDescription ) throws AuthorizationException, ImageDataException -// { -// if ( ServerSessionManager.getSession( serverSessionId ) == null ) { -// throw new AuthorizationException( AuthorizationError.NOT_AUTHENTICATED, "No valid serverSessionData" ); -// } -// -// // create new user -// FtpCredentials ftpCredentials = App.ftpServer.addUser( serverSessionId , MasterFtpServer.Mode.UPLOADING, ""); -// -// if ( ftpCredentials == null ) { -// log.error( "Could not create ftp credentials" ); -// return null; -// } -// -// try { -// ImageProcessor.addImageDataToProcess( imageDescription, ftpCredentials.username, ftpCredentials.password ); -// } catch (ImageDataException e) { -// App.ftpServer.removeUser( serverSessionId ); -// throw new ImageDataException( ImageDataError.INVALID_DATA, e.getMessage() ); -// } -// -// return ftpCredentials; -// } public static UploadInfos submitImage( String serverSessionId, ImageData imageDescription, List crcSums ) throws AuthorizationException, ImageDataException, UploadException { @@ -189,79 +155,4 @@ public class ApiServer final ServerSession session = new ServerSession( serverUser ); return ServerSessionManager.addSession( session ); } - -// /** -// * Tell the masterserver that the image upload finished. -// * -// * @param ftpUser the user that was used to upload -// * @param imageDescription the description of the uploaded image -// * @return if nothing went wrong -// * @throws ImageDataException if image was not submitted before -// * @throws AuthorizationException if no valid session exists -// */ -// public static boolean finishedUpload( String ftpUser, ImageData imageDescription ) throws ImageDataException -// { -// // check if user is valid -// synchronized ( App.ftpServer.users ) { -// if (!App.ftpServer.users.containsKey( ftpUser )) { -// throw new ImageDataException( ImageDataError.UNKNOWN_IMAGE, "Image with this data was not submitted before." ); -// } else { -// if ( App.ftpServer.users.get( ftpUser ).getMode() == MasterFtpServer.Mode.DOWNLOADING ) { -// throw new ImageDataException( ImageDataError.UNKNOWN_IMAGE, "Your were downloading and not uploading." ); -// } -// } -// } -// -// // process the image -// File userDirectory = new File( Globals.getPropertyString( Globals.PropString.FTPBASEDIR ) + "/" + ftpUser ); -// File[] list = userDirectory.listFiles(); -// -// if ( list == null || list.length != 1 ) { -// // user uploaded too many files -// return false; -// } -// -// log.info( ftpUser + " is done with upload" ); -// -// ImageProcessor.processImageAfterUpload( ftpUser, list[0].getName() ); -// -// // remove user that is not needed anymore -// App.ftpServer.removeUser( ftpUser ); -// log.info( "Removed user: " + ftpUser ); -// -// return true; -// } - -// public static FtpCredentials getImage( String uuid, String serverSessionId ) throws AuthorizationException, ImageDataException -// { -// if ( ServerSessionManager.getSession( serverSessionId ) == null ) { -// throw new AuthorizationException( AuthorizationError.NOT_AUTHENTICATED, "No valid serverSessionData" ); -// } -// -// // check if image exists -// DbImage image = DbImage.getImageByUUID( uuid ); -// -// if (image == null) throw new ImageDataException( ImageDataError.UNKNOWN_IMAGE, "No image found with uuid '" + uuid + "'"); -// -// FtpCredentials ftpCredentials = App.ftpServer.addUser( serverSessionId, MasterFtpServer.Mode.DOWNLOADING, new File(image.imagePath).getName() ); -// -// // TODO: what is happening here? -// if (ftpCredentials == null) return null; -// -// return ftpCredentials; -// } - -// public static boolean finishedDownload( String ftpUser ) -// { -// if ( !App.ftpServer.users.containsKey( ftpUser )) -// return false; -// -// log.info( "User: '" + ftpUser + "' finished download and gets deleted." ); -// -// // download is done, user can be removed now -// App.ftpServer.removeUser( ftpUser ); -// -// return true; -// } - } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/CRCScheduler.java b/src/main/java/org/openslx/imagemaster/serverconnection/CRCScheduler.java index b1fce15..a817738 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/CRCScheduler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/CRCScheduler.java @@ -18,10 +18,10 @@ public class CRCScheduler extends TimerTask @Override public void run() { - List list = ImageProcessor.getImagesToCheck(); - Iterator iter = list.iterator(); + List list = ImageProcessor.getImagesToCheck(); + Iterator iter = list.iterator(); while ( iter.hasNext() ) { - UploadingImageInfos image = iter.next(); + UploadingImage image = iter.next(); List blocks = image.getNotCheckedBlocks(); List finishedBlocks = new LinkedList<>(); try { diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/Connection.java b/src/main/java/org/openslx/imagemaster/serverconnection/Connection.java new file mode 100644 index 0000000..c6606ac --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/Connection.java @@ -0,0 +1,30 @@ +package org.openslx.imagemaster.serverconnection; + +public class Connection +{ + /** + * Where the file is stored locally. + */ + protected final String filepath; + + /** + * Type of this connection. + * True if uploading, false if downloading + */ + protected final boolean type; + public UploadingImage image = null; + public final static boolean UPLOADING = true; + public final static boolean DOWNLOADING = false; + + /** + * Create a new connection data + * @param filepath Where the file is stored locally + * @param type True if uploading, false if downloading + * @param listenerThread The active listener thread that listens for incoming connections + */ + protected Connection(String filepath, boolean type) + { + this.filepath = filepath; + this.type = type; + } +} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java deleted file mode 100644 index aeba3b9..0000000 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionData.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.openslx.imagemaster.serverconnection; - -import org.openslx.filetransfer.Listener; - -public class ConnectionData -{ - /** - * Where the file is stored locally. - */ - protected final String filepath; - - /** - * Type of this connection. - * True if uploading, false if downloading - */ - protected final boolean type; - public final static boolean UPLOADING = true; - public final static boolean DOWNLOADING = false; - - /** - * The active listener thread that listens for incoming connections. - */ - protected final Listener listenerThread; - - /** - * Create a new connection data - * @param filepath Where the file is stored locally - * @param type True if uploading, false if downloading - * @param listenerThread The active listener thread that listens for incoming connections - */ - protected ConnectionData(String filepath, boolean type, Listener listenerThread) - { - this.filepath = filepath; - this.type = type; - this.listenerThread = listenerThread; - } - - /** - * The port where this connection is running. - * @return The port where this connection is running. - */ - protected int getPort() { - return this.listenerThread.getPort(); - } -} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 0d400db..f1f8887 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -10,10 +10,10 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; -import java.util.HashMap; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -31,42 +31,30 @@ import org.openslx.imagemaster.Globals; */ public class ConnectionHandler implements IncomingEvent { + private static Logger log = Logger.getLogger( ConnectionHandler.class ); private static SSLContext sslContext; - /** - * Key: token, - * Value: Tuple of the listener and the filepath. - */ - private static Map activeListeners = new HashMap<>(); - private static List possiblePorts = new LinkedList<>(); + + private static Map connections = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); - // TODO: There should only ever be one Listener instance in the whole application, running on a fixed port - + + private static Listener listener; + static { - possiblePorts.add( 1234 ); - possiblePorts.add( 1235 ); - possiblePorts.add( 1236 ); - possiblePorts.add( 1237 ); - possiblePorts.add( 1238 ); - possiblePorts.add( 1239 ); - possiblePorts.add( 1240 ); - possiblePorts.add( 1241 ); - possiblePorts.add( 1242 ); - possiblePorts.add( 1243 ); - possiblePorts.add( 1244 ); - + log.debug( "Starting listener on port " + Globals.getSslSocketPort() ); try { - String pathToKeyStore = Globals.getSslKeystoreFile(); - char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); - KeyStore keystore = KeyStore.getInstance("JKS"); - keystore.load(new FileInputStream(pathToKeyStore), passphrase); - KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(keystore, passphrase); - sslContext = SSLContext.getInstance("SSLv3"); - KeyManager[] keyManagers = kmf.getKeyManagers(); - - sslContext.init(keyManagers, null, null); - } catch (FileNotFoundException e) { + String pathToKeyStore = Globals.getSslKeystoreFile(); + char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); + KeyStore keystore = KeyStore.getInstance( "JKS" ); + keystore.load( new FileInputStream( pathToKeyStore ), passphrase ); + KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() ); + kmf.init( keystore, passphrase ); + sslContext = SSLContext.getInstance( "SSLv3" ); + KeyManager[] keyManagers = kmf.getKeyManagers(); + sslContext.init( keyManagers, null, null ); + listener = new Listener( eventHandler, sslContext, Globals.getSslSocketPort() ); + listener.start(); + } catch ( FileNotFoundException e ) { log.error( "Could not find keystore." ); System.exit( 2 ); } catch ( KeyStoreException e ) { @@ -89,36 +77,37 @@ public class ConnectionHandler implements IncomingEvent System.exit( 2 ); } } - + /** * Add a new connection with a unique token. - * Tp up- or download the file in filepath. + * To up- or download the file in file path. + * * @param token The unique token * @param filepath The file to up- or download * @param type True if upload or false if download - * @return The port that was used for this connection. + * @return The created connection */ - public static int addConnection(String token, String filepath, boolean type) + public static Connection addConnection( String token, String filepath, boolean type ) { - int port = possiblePorts.remove( 0 ); //TODO: handle if no ports are left - Listener listener = new Listener( eventHandler, sslContext, port ); - - listener.start(); - - activeListeners.put( token, new ConnectionData(filepath, type, listener) ); - return port; + log.debug( "Added connection (" + ( ( type ) ? "uploading" : "downloading" ) + ") with token: '" + token + "'" ); + + Connection connection = new Connection( filepath, type ); + synchronized ( connections ) { + connections.put( token, connection ); + } + return connection; } - + public static boolean hasConnection( String token ) { - return activeListeners.containsKey( token ); + return connections.containsKey( token ); } - + public static void removeConnection( String token ) { - Listener l = activeListeners.remove( token ).listenerThread; - l.interrupt(); - possiblePorts.add(l.getPort()); // add port back to possible's list + synchronized ( connections ) { + connections.remove( token ); // token is remove, so connections are rejected + } } /** @@ -130,54 +119,73 @@ public class ConnectionHandler implements IncomingEvent // try to read meta data while ( uploader.readMetaData() ) { String token = uploader.getToken(); - log.debug( "Got token :'" + token + "'"); + log.debug( "Got token :'" + token + "'" ); // check token to identify the client - if ( !activeListeners.containsKey( token )) { + if ( !connections.containsKey( token ) ) { uploader.sendErrorCode( "Token not accepted." ); uploader.close(); return; } - + // check if he was a downloading client - if ( activeListeners.get( token ).type == ConnectionData.UPLOADING ) { - uploader.sendErrorCode( "You can not download, if you are uploading." ); // TODO: Why not? + if ( connections.get( token ).type == Connection.UPLOADING ) { + uploader.sendErrorCode( "You can not download, if you are uploading." ); uploader.close(); return; } // TODO: check which range needs to be sent and send this range - long length = ( new File( activeListeners.get( token ).filepath ) ).length(); - - uploader.sendRange(0, (int)length); - uploader.sendFile( activeListeners.get( token ).filepath ); + long length = ( new File( connections.get( token ).filepath ) ).length(); + + uploader.sendRange( 0, (int)length ); + uploader.sendFile( connections.get( token ).filepath ); } uploader.close(); } - + /** * Server is downloading - client is uploading! */ @Override public void incomingDownloader( Downloader downloader ) throws IOException { + int startOfRange = 0; + int endOfRange = 0; + int diffOfRange = 0; + String token = ""; // try to read meta data while ( downloader.readMetaData() ) { // check token to identify the client - String token = downloader.getToken(); - if ( !activeListeners.containsKey( token ) ) { + token = downloader.getToken(); + if ( !connections.containsKey( token ) ) { downloader.sendErrorCode( "Token not accepted." ); downloader.close(); return; } + + // get range + startOfRange = downloader.getStartOfRange(); + endOfRange = downloader.getEndOfRange(); + diffOfRange = downloader.getDiffOfRange(); + // check if he was a uploading client - if ( activeListeners.get( token ).type == ConnectionData.DOWNLOADING ) { + if ( connections.get( token ).type == Connection.DOWNLOADING ) { downloader.sendErrorCode( "You can not upload, if you are downloading." ); downloader.close(); return; } - - downloader.setOutputFilename( activeListeners.get( token ).filepath ); + + downloader.setOutputFilename( connections.get( token ).filepath ); downloader.readBinary(); } downloader.close(); + + // calculate and register the incoming blocks + if ( diffOfRange == 0 ) + return; + + for ( int i = startOfRange / Globals.blockSize; i < endOfRange / Globals.blockSize; i += Globals.blockSize ) { + connections.get( token ).image.addBlockToCheck( i ); + } + } } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClient.java b/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClient.java new file mode 100644 index 0000000..0acfa42 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClient.java @@ -0,0 +1,75 @@ +package org.openslx.imagemaster.serverconnection; + +import java.util.HashMap; +import java.util.List; + +/** + * Helper class for the ImageProcessor to know some things about the downloading client + * + */ +public class DownloadingClient +{ + + public final HashMap downloadingImages = new HashMap<>(); + + DownloadingClient() + { + } + + public void addDownload( String uuid, List list, String token ) + { + downloadingImages.put( uuid, new ImageInfos( uuid, list, token ) ); + } + + public void removeDownload( String uuid ) + { + downloadingImages.remove( uuid ); + } + + public boolean isDownloading( String uuid ) + { + return downloadingImages.containsKey( uuid ); + } + + public boolean hasDownloads() + { + return (downloadingImages.size() > 0); + } + + public List getLastRequestedBlocks( String uuid ) + { + if ( !downloadingImages.containsKey( uuid ) ) + return null; + return downloadingImages.get( uuid ).lastRequestedBlocks; + } + + public void requestBlocks( String uuid, List list ) + { + if ( !downloadingImages.containsKey( uuid ) ) + return; + downloadingImages.get( uuid ).lastRequestedBlocks = list; + } + + public String getToken( String uuid ) + { + if ( !downloadingImages.containsKey( uuid ) ) + return null; + return downloadingImages.get( uuid ).token; + } + + class ImageInfos + { + + public final String uuid; + public final String token; + private List lastRequestedBlocks; + + ImageInfos(String uuid, List list, String token) + { + this.uuid = uuid; + this.lastRequestedBlocks = list; + this.token = token; + } + } + +} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java b/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java deleted file mode 100644 index 4735938..0000000 --- a/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClientInfos.java +++ /dev/null @@ -1,82 +0,0 @@ -package org.openslx.imagemaster.serverconnection; - -import java.util.HashMap; -import java.util.List; - -/** - * Helper class for the ImageProcessor to know some things about the downloading client - * - */ -public class DownloadingClientInfos -{ - - public final HashMap downloadingImages = new HashMap<>(); - - DownloadingClientInfos() - { - } - - public void addDownload( String uuid, int port, List list, String token ) - { - downloadingImages.put( uuid, new ImageInfos( uuid, port, list, token ) ); - } - - public void removeDownload( String uuid ) - { - downloadingImages.remove( uuid ); - } - - public boolean isDownloading( String uuid ) - { - return downloadingImages.containsKey( uuid ); - } - - public boolean hasDownloads() - { - return (downloadingImages.size() > 0); - } - - public List getLastRequestedBlocks( String uuid ) - { - if ( !downloadingImages.containsKey( uuid ) ) - return null; - return downloadingImages.get( uuid ).lastRequestedBlocks; - } - - public void requestBlocks( String uuid, List list ) - { - if ( !downloadingImages.containsKey( uuid ) ) - return; - downloadingImages.get( uuid ).lastRequestedBlocks = list; - } - - public String getToken( String uuid ) - { - if ( !downloadingImages.containsKey( uuid ) ) - return null; - return downloadingImages.get( uuid ).token; - } - - public int getPort( String uuid ) - { - return downloadingImages.get( uuid ).port; - } - - class ImageInfos - { - - public final String uuid; - public final String token; - public final int port; - private List lastRequestedBlocks; - - ImageInfos(String uuid, int port, List list, String token) - { - this.uuid = uuid; - this.lastRequestedBlocks = list; - this.token = token; - this.port = port; - } - } - -} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java b/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java index ec043fa..8306280 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java @@ -2,10 +2,13 @@ package org.openslx.imagemaster.serverconnection; import java.io.IOException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.openslx.imagemaster.Globals; @@ -34,8 +37,8 @@ public class ImageProcessor * Key: imageUUID, * Value: uploadingImageInfos */ - private static HashMap uploadingImages = new HashMap<>(); - + private static Map uploadingImages = new ConcurrentHashMap<>(); + /** * The UUIDs of the images that need to be checked by the crc checker. */ @@ -46,7 +49,7 @@ public class ImageProcessor * Key: serverSessionId * Value: downloadingClientInfos */ - private static HashMap downloadingClients = new HashMap<>(); + private static HashMap downloadingClients = new HashMap<>(); /** * Checks if this image is already uploading and returns a new list with missing blocks if so. @@ -56,63 +59,67 @@ public class ImageProcessor * @param imageData The data of the image * @return */ - public static UploadInfos getUploadInfos( String serverSessionId, ImageData imageData, List crcSums) + public static UploadInfos getUploadInfos( String serverSessionId, ImageData imageData, List crcSums ) { // check image data // TODO: do security checks - String uuid = imageData.uuid; + log.debug( "Trying to answer request:" + serverSessionId + ", " + imageData + ", " + crcSums ); - // check if image is already uploading TODO: what if two clients call this at the same time? -> Good question.. (Thought about not sending the last requested. But then the upload will never finish...) - if ( uploadingImages.containsKey( uuid ) ) { - List missing = getMissingBlocks( uuid, AMOUNT ); - if ( missing.isEmpty() ) { - uploadDone( uuid ); - return new UploadInfos( null, 0, missing ); - } - uploadingImages.get( uuid ).addNotCheckedBlocks( missing ); - UploadingImageInfos image = uploadingImages.get( uuid ); - return new UploadInfos( image.getToken(), image.getPort(), missing ); - } + String uuid = imageData.uuid; + String token; + String filepath; + String crcPath; + + List allBlocks; + UploadingImage image; - // insert new image and start listener synchronized ( uploadingImages ) { - String crcPath = Globals.getImageDir() + "/" + uuid + ".crc"; - try { - CRCFile crcFile = new CRCFile( crcSums, crcPath); - } catch (IOException e) { - return null; // TODO: what to do if we can not write the crc file to disk? + // check if image is already uploading TODO: what if two clients call this at the same time? -> Good question.. (Thought about not sending the last requested. But then the upload will never finish...) + if ( ( image = uploadingImages.get( uuid ) ) != null ) { + log.debug( "Image is already uploading" ); + List missing = getNMissingBlocks( image, AMOUNT ); + if ( missing.isEmpty() ) { + uploadDone( uuid ); + return new UploadInfos( null, 0, missing ); + } + return new UploadInfos( image.getToken(), Globals.getSslSocketPort(), missing ); } + + // insert new image + log.debug( "Inserting new download" ); + filepath = Globals.getImageDir() + "/" + uuid + ".vmdk"; + token = RandomString.generate( 100, false ); + crcPath = Globals.getImageDir() + "/" + uuid + ".crc"; int nBlocks = (int)Math.ceil( imageData.fileSize / Globals.blockSize ); - List allBlocks = new LinkedList<>(); - for ( int i = 0; i < nBlocks; i++ ) { // fill empty list with all block numbers + allBlocks = new ArrayList<>( nBlocks ); + for ( int i = nBlocks - 1; i >= 0; i-- ) { // fill empty list with all block numbers allBlocks.add( i ); } - String token = RandomString.generate( 100, false ); - String filepath = Globals.getImageDir() + "/" + uuid + ".vmdk"; - int port = ConnectionHandler.addConnection( token, filepath, ConnectionData.UPLOADING ); - // TODO: proper synchronization, interface is multi threaded. - // should synchronize operations on the map (use concurrent map) and then synchronize on the uploading image - // when handing the missing blocks etc... - uploadingImages.put( uuid, new UploadingImageInfos( token, port, allBlocks, serverSessionId, new Timestamp( System.currentTimeMillis() ), uuid, filepath, crcPath ) ); - DbImage.insert( imageData, System.currentTimeMillis(), token, allBlocks, serverSessionId, filepath ); - imagesToCheck.add( uuid ); - - List missing = getMissingBlocks( uuid, AMOUNT ); - if ( missing.isEmpty() ) { - // TODO: if this is empty, check if there are pending blocks and if so, request them again - uploadDone( uuid ); - } - uploadingImages.get( uuid ).addNotCheckedBlocks( missing ); - return new UploadInfos( token, port, missing ); + image = new UploadingImage( token, allBlocks, new Timestamp( System.currentTimeMillis() ), uuid, filepath, crcPath ); + uploadingImages.put( uuid, image ); } + + try { + new CRCFile( crcSums, crcPath); + } catch (IOException e) { + log.debug( "Could not create crc file" ); + return null; // TODO: what to do if we can not write the crc file to disk? Give object to crcscheduler? + } + + ConnectionHandler.addConnection( token, filepath, Connection.UPLOADING ).image = image; + DbImage.insert( imageData, System.currentTimeMillis(), token, allBlocks, serverSessionId, filepath ); + imagesToCheck.add( uuid ); + + log.debug( "Returning UploadInfos" ); + return new UploadInfos( token, Globals.getSslSocketPort(), getNMissingBlocks( image, AMOUNT ) ); } public static DownloadInfos getDownloadInfos( String serverSessionId, String uuid, List requestedBlocks ) { // check if server is already downloading if ( downloadingClients.containsKey( serverSessionId ) ) { - DownloadingClientInfos client = downloadingClients.get( serverSessionId ); + DownloadingClient client = downloadingClients.get( serverSessionId ); // remove download if done if ( requestedBlocks.isEmpty() ) @@ -126,36 +133,36 @@ public class ImageProcessor // client was downloading this image // update the requested blocks client.requestBlocks( uuid, requestedBlocks ); - return new DownloadInfos( client.getToken( uuid ), client.getPort( uuid ) ); + return new DownloadInfos( client.getToken( uuid ), Globals.getSslSocketPort() ); } // server was downloading another image and now gets a new connection for this new download String token = RandomString.generate( 100, false ); String filepath = DbImage.getImageByUUID( uuid ).imagePath; - int port = ConnectionHandler.addConnection( token, filepath, ConnectionData.DOWNLOADING ); + ConnectionHandler.addConnection( token, filepath, Connection.DOWNLOADING ); - client.addDownload( uuid, port, requestedBlocks, token ); + client.addDownload( uuid, requestedBlocks, token ); downloadingClients.put( serverSessionId, client ); - return new DownloadInfos( token, port ); + return new DownloadInfos( token, Globals.getSslSocketPort() ); } // insert new client and start listener synchronized ( downloadingClients ) { - DownloadingClientInfos client = new DownloadingClientInfos(); + DownloadingClient client = new DownloadingClient(); String token = RandomString.generate( 100, false ); String filepath = DbImage.getImageByUUID( uuid ).imagePath; - int port = ConnectionHandler.addConnection( token, filepath, ConnectionData.DOWNLOADING ); + ConnectionHandler.addConnection( token, filepath, Connection.DOWNLOADING ); - client.addDownload( uuid, port, requestedBlocks, token ); + client.addDownload( uuid, requestedBlocks, token ); downloadingClients.put( serverSessionId, client ); - return new DownloadInfos( token, port ); + return new DownloadInfos( token, Globals.getSslSocketPort() ); } } private static void downloadDone( String serverSessionId, String uuid ) { synchronized ( downloadingClients ) { - DownloadingClientInfos client = downloadingClients.get( serverSessionId ); + DownloadingClient client = downloadingClients.get( serverSessionId ); client.removeDownload( uuid ); ConnectionHandler.removeConnection( client.getToken( uuid ) ); if ( !client.hasDownloads() ) { @@ -171,23 +178,18 @@ public class ImageProcessor * @param amount The amount of blocks that you want to get * @return The missing blocks */ - private static List getMissingBlocks( String imageUUID, int amount ) + private static List getNMissingBlocks( UploadingImage image, int amount ) { - UploadingImageInfos image = uploadingImages.get( imageUUID ); - List list = image.getMissingBlocks(); - List result = new LinkedList<>(); + int size = image.amountOfMissingBlocks(); + if ( amount > size ) + amount = size; - if ( amount > list.size() ) - amount = list.size(); + List result = new ArrayList<>( amount ); - for ( int i = 0; i < amount; i++ ) { - result.add( list.get( i ) ); + for ( int i = 1; i <= amount; i++ ) { + result.add( image.removeMissingBlock( size - i ) ); } - - synchronized ( image ) { - image.addNotCheckedBlocks( result ); - } - + return result; } @@ -199,10 +201,11 @@ public class ImageProcessor */ private static void uploadDone( String uuid ) { - synchronized (imagesToCheck) { + synchronized ( imagesToCheck ) { imagesToCheck.remove( uuid ); } - UploadingImageInfos image; + + UploadingImage image; synchronized ( uploadingImages ) { image = uploadingImages.remove( uuid ); } @@ -211,18 +214,18 @@ public class ImageProcessor // remove the connection so that it can be used by a new client ConnectionHandler.removeConnection( image.getToken() ); } - - public static List getImagesToCheck() + + public static List getImagesToCheck() { - List result = new LinkedList<>(); + List result = new LinkedList<>(); Iterator iter = imagesToCheck.iterator(); while ( iter.hasNext() ) { result.add( uploadingImages.get( iter.next() ) ); } - + return result; } - + /** * Checks pending uploads in database and adds them to process list again. */ @@ -231,8 +234,9 @@ public class ImageProcessor List list = DbImage.getUploadingImages(); for ( DbImage image : list ) { String token = image.token; - int port = ConnectionHandler.addConnection( token, image.imagePath, ConnectionData.UPLOADING ); - UploadingImageInfos infos = new UploadingImageInfos( token, port, image.missingBlocks, image.serverSessionId, image.timestamp, image.uuid, image.imagePath, Globals.getImageDir() + "/" + image.uuid + ".crc" ); + ConnectionHandler.addConnection( token, image.imagePath, Connection.UPLOADING ); + UploadingImage infos = new UploadingImage( token, image.missingBlocks, image.timestamp, + image.uuid, image.imagePath, Globals.getImageDir() + "/" + image.uuid + ".crc" ); uploadingImages.put( image.uuid, infos ); } log.info( "Added " + list.size() + " pending upload(s) to process list again." ); diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java b/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java new file mode 100644 index 0000000..e6acb78 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java @@ -0,0 +1,114 @@ +package org.openslx.imagemaster.serverconnection; + +import java.sql.Timestamp; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.apache.log4j.Logger; +import org.openslx.imagemaster.db.DbImage; + +/** + * Helper class for ImageProcessor to save some infos about the images in the process list. + */ +public class UploadingImage +{ + public static final Logger log = Logger.getLogger( UploadingImage.class ); + /** + * Token for the satellite. + */ + private String token; + /** + * The missing blocks that need to be uploaded by the satellite. + */ + private List missingBlocks; + /** + * The list of blocks that were requested but not checked + */ + private List blocksToCheck = new LinkedList<>(); + private Timestamp ts; // when did the server something for the last time + private DbImage dbImage = null; // the DB representation of this image + private String uuid; + private String filename; + private String crcFilename; + + protected UploadingImage(String token, List missingBlocks, Timestamp ts, String uuid, String filename, String crcFilename) + { + this.token = token; + this.missingBlocks = missingBlocks; + this.ts = ts; + this.uuid = uuid; + this.crcFilename = crcFilename; + log.debug(missingBlocks); + } + + protected void removeBlock( int number ) + { + this.missingBlocks.remove( number ); + dbImage.updateMissingBlocks( missingBlocks ); + } + + protected void removeBlocks( Collection list ) + { + this.missingBlocks.removeAll( list ); + dbImage.updateMissingBlocks( missingBlocks ); + } + + protected void addBlockToCheck( int number ) + { + synchronized ( blocksToCheck ) { + blocksToCheck.add( number ); + } + log.debug( number + " added to check list..." ); + } + + protected List getNotCheckedBlocks() + { + return this.blocksToCheck; + } + + protected String getToken() + { + return this.token; + } + + protected List getAllMissingBlocks() + { + return this.missingBlocks; + } + + protected Timestamp getTimestamp() + { + return this.ts; + } + + protected DbImage getDbImage() + { + if ( dbImage == null ) { + dbImage = DbImage.getImageByUUID( this.uuid ); + } + return this.dbImage; + } + + protected String getFilename() + { + return this.filename; + } + + protected String getCrcFilename() + { + return this.crcFilename; + } + + public int amountOfMissingBlocks() + { + return missingBlocks.size(); + } + + public int removeMissingBlock( int index ) + { + synchronized ( missingBlocks ) { + return missingBlocks.remove( index ); + } + } +} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImageInfos.java b/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImageInfos.java deleted file mode 100644 index 71e0b46..0000000 --- a/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImageInfos.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.openslx.imagemaster.serverconnection; - -import java.sql.Timestamp; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; - -import org.openslx.imagemaster.db.DbImage; - -/** - * Helper class for ImageProcessor to save some infos about the images in the process list. - */ -public class UploadingImageInfos -{ - /** - * Token for the satellite. - */ - private String token; - /** - * The port where the server is listening. - */ - private int port; - /** - * The missing blocks that need to be uploaded by the satellite. - */ - private List missingBlocks; - // TODO: Do we have synchronization with the db yet? The list of missing blocks should - // be written to the DB periodically so when the server restarts we know which blocks are already - // complete... - /** - * The list of blocks that the satellite received last. - * (This could be used to tell the CRCChecker to check these blocks. - */ - private List notCheckedBlocks = new LinkedList<>(); - private String serverSessionId; - private Timestamp ts; // when did the server something for the last time - private DbImage dbImage = null; // the DB representation of this image - private String uuid; - private String filename; - private String crcFilename; - - protected UploadingImageInfos(String token, int port, List missingBlocks, String serverSessionId, Timestamp ts, String uuid, String filename, String crcFilename) - { - this.token = token; - this.missingBlocks = missingBlocks; - this.serverSessionId = serverSessionId; - this.ts = ts; - this.uuid = uuid; - this.crcFilename = crcFilename; - } - - protected void removeBlock!result ) { - /*( int number ) - { - this.missingBlocks.remove( number ); - } - - protected void removeBlocks( Collection list ) - { - this.missingBlocks.removeAll( list ); - } - - protected void addNotCheckedBlocks( List list ) - { - this.notCheckedBlocks = list; - } - - protected List getNotCheckedBlocks() - { - return this.notCheckedBlocks; - } - - protected String getToken() - { - return this.token; - } - - protected List getMissingBlocks() - { - return this.missingBlocks; - } - - protected String getServerSessionId() - { - return this.serverSessionId; - } - - protected Timestamp getTimestamp() - { - return this.ts; - } - - protected DbImage getDbImage() - { - if ( dbImage == null ) { - dbImage = DbImage.getImageByUUID( this.uuid ); - } - return this.dbImage; - } - - protected String getFilename() - { - return this.filename; - } - - protected String getCrcFilename() - { - return this.crcFilename; - } - - public int getPort() - { - return this.port; - } -} diff --git a/src/main/java/org/openslx/imagemaster/util/ByteArray.java b/src/main/java/org/openslx/imagemaster/util/ByteArray.java new file mode 100644 index 0000000..71244a9 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/util/ByteArray.java @@ -0,0 +1,13 @@ +package org.openslx.imagemaster.util; + + +public class ByteArray +{ + + public final byte[] array; + + public ByteArray(byte[] array) + { + this.array = array; + } +} \ No newline at end of file -- cgit v1.2.3-55-g7522