diff options
author | Simon Rettberg | 2014-09-25 14:50:56 +0200 |
---|---|---|
committer | Simon Rettberg | 2014-09-25 14:50:56 +0200 |
commit | 8b87677709624a56a7557104dc31ee8cc2ece748 (patch) | |
tree | 570d9bf3216ea9aa2c5c4158409ec3475e246963 /src/main/java/org/openslx/imagemaster/serverconnection | |
parent | Adapted classes to new filetransfer. (diff) | |
download | masterserver-8b87677709624a56a7557104dc31ee8cc2ece748.tar.gz masterserver-8b87677709624a56a7557104dc31ee8cc2ece748.tar.xz masterserver-8b87677709624a56a7557104dc31ee8cc2ece748.zip |
Adapt to changed Thrift RPC
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection')
6 files changed, 232 insertions, 540 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/Connection.java b/src/main/java/org/openslx/imagemaster/serverconnection/Connection.java deleted file mode 100644 index bfdb1bb..0000000 --- a/src/main/java/org/openslx/imagemaster/serverconnection/Connection.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.openslx.imagemaster.serverconnection; - -/** - * This represents a connection. - * (When a satellite is up- or downloading an image from/to masterserver). - * It is used to help the ConnectionHandler and is storing infos. - */ -public class Connection -{ - /** - * Where the file is stored locally. - */ - protected final String filepath; - - /** - * Type of this connection. - * True if uploading, false if downloading - */ - protected final boolean type; - public UploadingImage image = null; - public DownloadingClient client = null; - public final static boolean UPLOADING = true; - public final static boolean DOWNLOADING = false; - - /** - * Create a new connection data - * @param filepath Where the file is stored locally - * @param type True if uploading, false if downloading - * @param listenerThread The active listener thread that listens for incoming connections - */ - protected Connection(String filepath, boolean type) - { - this.filepath = filepath; - this.type = type; - } -} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 0fb52f5..e6319c9 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -1,6 +1,5 @@ package org.openslx.imagemaster.serverconnection; -import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -10,14 +9,18 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.log4j.Logger; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; @@ -26,6 +29,7 @@ import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.imagemaster.Globals; +import org.openslx.imagemaster.db.DbImage; /** * Class to handle all incoming and outgoing connections. @@ -37,8 +41,11 @@ public class ConnectionHandler implements IncomingEvent private static Logger log = Logger.getLogger( ConnectionHandler.class ); private static SSLContext sslContext; - private static Map<String, Connection> connections = new ConcurrentHashMap<>(); + private static Map<String, UploadingImage> pendingIncomingUploads = new ConcurrentHashMap<>(); + private static Map<String, DbImage> pendingIncomingDownloads = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); + private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); + private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); private static Listener listener; @@ -81,153 +88,120 @@ public class ConnectionHandler implements IncomingEvent } /** - * Add a new connection with a unique token. - * To up- or download the file in file path. + * Add a new allowed incoming upload connection + * for the given token and image. * * @param token The unique token - * @param filepath The file to up- or download - * @param type True if upload or false if download - * @return The created connection + * @param image Image being uploaded */ - public static Connection addConnection( String token, String filepath, boolean type ) + public static void addUpload( String token, UploadingImage image ) { - log.debug( "Added connection (" + ( ( type ) ? "uploading" : "downloading" ) + ") with token: '" + token + "'" ); - - Connection connection = new Connection( filepath, type ); - synchronized ( connections ) { - connections.put( token, connection ); - } - return connection; - } - - public static boolean hasConnection( String token ) - { - return connections.containsKey( token ); + pendingIncomingUploads.put( token, image ); + log.debug( "Added upload" ); } - public static void removeConnection( String token ) + /** + * Add a new allowed incoming download connection + * for the given token and image. + * + * @param token The unique token + * @param image Image being uploaded + */ + public static void addDownload( String token, DbImage image ) { - synchronized ( connections ) { - connections.remove( token ); // token is remove, so connections are rejected - } + pendingIncomingDownloads.put( token, image ); + log.debug( "Added download" ); } /** * Server is uploading - client is downloading! */ @Override - public void incomingUploader( Uploader uploader ) throws IOException + public void incomingUploader( final Uploader uploader ) { String token = uploader.getToken(); - log.debug( "Got token :'" + token + "'" ); // check token to identify the client - if (token == null) - { + if ( token == null ) { uploader.sendErrorCode( "No token available." ); - uploader.close(null); + uploader.close( null ); return; } - if ( !connections.containsKey( token ) ) { + + final DbImage image = pendingIncomingDownloads.remove( token ); + if ( image == null ) { uploader.sendErrorCode( "Token not accepted." ); - uploader.close(null); + uploader.close( null ); return; } - // check if he was a downloading client - if ( connections.get( token ).type == Connection.UPLOADING ) { - uploader.sendErrorCode( "You can not download, if you are uploading." ); - uploader.close(null); - return; + try { + uploadPool.execute( new Runnable() { + @Override + public void run() + { + uploader.upload( image.getAbsolutePath() ); + } + } ); + } catch ( RejectedExecutionException e ) { + uploader.sendErrorCode( "Too many concurrent uploads." ); + uploader.close( null ); } - - String fileName = connections.get( token ).filepath; - uploader.upload(fileName); } /** * Server is downloading - client is uploading! */ @Override - public void incomingDownloader( Downloader downloader ) throws IOException + public void incomingDownloader( final Downloader downloader ) throws IOException { log.debug( "Client wants to upload" ); String token = downloader.getToken(); - if (token == null) + if ( token == null ) { downloader.sendErrorCode( "No token available." ); - downloader.close(null); + downloader.close( null ); return; } - // Check token to identify the client. - if ( !connections.containsKey( token ) ) { + + final UploadingImage image = pendingIncomingUploads.remove( token ); + if ( image == null ) { downloader.sendErrorCode( "Token not accepted." ); - downloader.close(null); + downloader.close( null ); return; } - - // check if he was a uploading client - if ( connections.get( token ).type == Connection.DOWNLOADING ) { - downloader.sendErrorCode( "You can not upload, if you are downloading." ); - downloader.close(null); - return; + final MutableInt lastBlock = new MutableInt( -1 ); + + try { + downloadPool.execute( new Runnable() { + @Override + public void run() + { + downloader.download( image.getAbsolutePath(), new WantRangeCallback() { + @Override + public FileRange get() + { + if ( lastBlock.intValue() != -1 ) { + image.setNeedsCheck( lastBlock.intValue() ); + image.increaseTransmittedTimes( lastBlock.intValue() ); + } + // get start of range. + int blockNumber = image.getNextMissingBlock(); + if ( blockNumber == -1 ) + return null; + lastBlock.setValue( blockNumber ); + log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." ); + + long startOfRange = image.getNextMissingBlock() * Globals.blockSize; + long endOfRange = Math.min( startOfRange + Globals.blockSize, image.getFileSize() ); + FileRange range = new FileRange( startOfRange, endOfRange ); + return range; + } + } ); + } + } ); + } catch ( RejectedExecutionException e ) { + downloader.sendErrorCode( "Too many concurrent downloads." ); + downloader.close( null ); } - - String destinationFileName = connections.get( token ).filepath; - final UploadingImage image = connections.get( token ).image; - downloader.download( destinationFileName, new WantRangeCallback() { - - @Override - public FileRange get() { - // get start of range. - int blockNumber = image.getNextMissingBlock(); - if (blockNumber == -1) - return null; - - image.setNeedsCheck( blockNumber ); - image.increaseTransmittedTimes( blockNumber ); - log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." ); - - long startOfRange = image.getNextMissingBlock() * Globals.blockSize; - long endOfRange = Math.min(startOfRange + Globals.blockSize, image.getImageFile().length()); - FileRange range = new FileRange(startOfRange, endOfRange); - return range; - } - }); -// long startOfRange = 0; -// String token = ""; -// // try to read meta data -// while ( downloader.readMetaData() ) { -// // check token to identify the client -// token = downloader.getToken(); -// if ( !connections.containsKey( token ) ) { -// downloader.sendErrorCode( "Token not accepted." ); -// downloader.close(null); -// return; -// } -// -// startOfRange = downloader.getStartOfRange(); -// -// if ( downloader.getDiffOfRange() <= 0 ) { -// return; -// } -// -// // check if he was a uploading client -// if ( connections.get( token ).type == Connection.DOWNLOADING ) { -// downloader.sendErrorCode( "You can not upload, if you are downloading." ); -// downloader.close(null); -// return; -// } -// -// -// int blockNumber = (int) ( startOfRange / Globals.blockSize ); -// UploadingImage image = connections.get( token ).image; -// image.setNeedsCheck( blockNumber ); -// image.increaseTransmittedTimes( blockNumber ); -// log.debug( "Block " + blockNumber + " was transmitted " + image.getTimesTransmitted( blockNumber ) + " time(s)." ); -// -// downloader.setOutputFilename( connections.get( token ).filepath ); -// downloader.receiveBinary(); -// } - downloader.close(null); } } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/CrcScheduler.java b/src/main/java/org/openslx/imagemaster/serverconnection/CrcScheduler.java index 1ad921b..ff1374c 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/CrcScheduler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/CrcScheduler.java @@ -29,7 +29,7 @@ public class CrcScheduler extends TimerTask // iterate over the uploading images that need to be checked while ( iter.hasNext() ) { UploadingImage image = iter.next(); - log.debug( "Checking blocks of " + image.getDbImage().imageName ); + log.debug( "Checking blocks of " + image.getUuid() ); CrcChecker crcChecker = new CrcChecker( image.getImageFile(), image.getCrcFile() ); log.debug( "CRCFile is valid: " + crcChecker.hasValidCrcFile() ); if ( !crcChecker.hasValidCrcFile() ) { diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClient.java b/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClient.java deleted file mode 100644 index 630096f..0000000 --- a/src/main/java/org/openslx/imagemaster/serverconnection/DownloadingClient.java +++ /dev/null @@ -1,71 +0,0 @@ -package org.openslx.imagemaster.serverconnection; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Helper class for the ImageProcessor and ConnectionHandler to know some things about the downloading client - */ -public class DownloadingClient -{ - - private final HashMap<String, ImageInfos> downloadingImages = new HashMap<>(); - - public void addDownload( String uuid, List<Integer> list, String token ) - { - downloadingImages.put( uuid, new ImageInfos( uuid, list, token ) ); - } - - public void removeDownload( String uuid ) - { - downloadingImages.remove( uuid ); - } - - public boolean isDownloading( String uuid ) - { - return downloadingImages.containsKey( uuid ); - } - - public boolean hasDownloads() - { - return (downloadingImages.size() > 0); - } - - public List<Integer> getLastRequestedBlocks( String token ) - { - for (Map.Entry<String, ImageInfos> entry : downloadingImages.entrySet() ) { - if (entry.getValue().token.equals( token )) return entry.getValue().lastRequestedBlocks; - } - return null; - } - - public void requestBlocks( String uuid, List<Integer> list ) - { - if ( !downloadingImages.containsKey( uuid ) ) - return; - downloadingImages.get( uuid ).lastRequestedBlocks = list; - } - - public String getToken( String uuid ) - { - if ( !downloadingImages.containsKey( uuid ) ) - return null; - return downloadingImages.get( uuid ).token; - } - - class ImageInfos - { - public final String uuid; - public final String token; - private List<Integer> lastRequestedBlocks; - - ImageInfos(String uuid, List<Integer> list, String token) - { - this.uuid = uuid; - this.lastRequestedBlocks = list; - this.token = token; - } - } - -} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java b/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java index d5e253c..739bc62 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ImageProcessor.java @@ -1,9 +1,7 @@ package org.openslx.imagemaster.serverconnection; import java.io.File; -import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -15,13 +13,13 @@ import org.openslx.imagemaster.Globals; import org.openslx.imagemaster.crcchecker.CrcFile; import org.openslx.imagemaster.db.DbImage; import org.openslx.imagemaster.db.DbUser; -import org.openslx.imagemaster.thrift.iface.DownloadInfos; +import org.openslx.imagemaster.thrift.iface.DownloadData; import org.openslx.imagemaster.thrift.iface.ImageData; import org.openslx.imagemaster.thrift.iface.ImageDataError; import org.openslx.imagemaster.thrift.iface.ImageDataException; +import org.openslx.imagemaster.thrift.iface.UploadData; import org.openslx.imagemaster.thrift.iface.UploadError; import org.openslx.imagemaster.thrift.iface.UploadException; -import org.openslx.imagemaster.thrift.iface.UploadInfos; import org.openslx.imagemaster.util.RandomString; import org.openslx.imagemaster.util.Util; @@ -35,10 +33,6 @@ public class ImageProcessor private static final Logger log = Logger.getLogger( ImageProcessor.class ); /** - * The amount of blocks that is return in UploadInfos (after request of satellite) - */ - private static final int AMOUNT = 20; - /** * The uploading images. * Key: imageUUID, * Value: uploadingImageInfos @@ -48,14 +42,7 @@ public class ImageProcessor /** * The UUIDs of the images that need to be checked by the crc checker. */ - private static List<String> imagesToCheck = new LinkedList<>(); - - /** - * The downloading clients. - * Key: serverSessionId - * Value: downloadingClientInfos - */ - private static HashMap<String, DownloadingClient> downloadingClients = new HashMap<>(); + private static List<String> imagesToCheck = new ArrayList<>(); /** * Checks if this image is already uploading and returns a new list with missing blocks if so. @@ -66,218 +53,81 @@ public class ImageProcessor * @return * @throws UploadException If some error occurred during the process */ - public static UploadInfos getUploadInfos( String serverSessionId, ImageData imageData, List<Integer> crcSums ) + public static UploadData getUploadInfos( ImageData imageData, List<Integer> crcSums ) throws UploadException, ImageDataException { // check image data - if ( imageData.imageName == null || imageData.imageName.isEmpty() ) { + if ( imageData.imageName == null || imageData.imageName.isEmpty() ) throw new ImageDataException( ImageDataError.INVALID_DATA, "Image name not set." ); - } else if ( imageData.imageOwner == null || imageData.imageOwner.isEmpty() ) { - throw new ImageDataException( ImageDataError.INVALID_DATA, "Image owner not set." ); - } else if ( imageData.contentOperatingSystem == null || imageData.contentOperatingSystem.isEmpty() ) { + if ( imageData.imageOwner == null || !DbUser.exists( imageData.imageOwner ) ) + throw new ImageDataException( ImageDataError.INVALID_DATA, "Invalid image owner." ); + if ( imageData.contentOperatingSystem == null || imageData.contentOperatingSystem.isEmpty() ) throw new ImageDataException( ImageDataError.INVALID_DATA, "Content operating system not set." ); - } else if ( imageData.fileSize <= 0 ) { + if ( imageData.fileSize <= 0 ) throw new ImageDataException( ImageDataError.INVALID_DATA, "File size is too small." ); - } else if ( !DbUser.exists( imageData.imageOwner ) ) { - throw new ImageDataException( ImageDataError.INVALID_DATA, "User is not known." ); - } - - //TODO: this is not working like this: - DbImage i = DbImage.getImageByUuid( imageData.uuid ); -// boolean isUpdate = false; -// if ( i != null ) { -// // image is already available -// // is the client updating?? -// if ( imageData.imageVersion <= i.imageVersion ) { -// throw new ImageDataException( ImageDataError.INVALID_DATA, "This image with the same or a newer version is already available." ); -// } else { -// // TODO: update db and prepare for new image file -// isUpdate = true; -// } -// } - log.debug( serverSessionId + " is submitting " + imageData.uuid ); + log.debug( "Satellite is submitting " + imageData.uuid ); - String uuid = imageData.uuid; - String token; - String filepath; - int nBlocks; - int[] allBlocks; + final String uuid = imageData.uuid; + final String filepathRelative; + final CrcFile crcFile = new CrcFile( crcSums ); UploadingImage image; synchronized ( uploadingImages ) { // check if image is already uploading - if ( ( image = uploadingImages.get( uuid ) ) != null ) { - if ( image.getCrcFile() == null ) { - CrcFile crcFile; - try { - // try to write crc file ... - crcFile = CrcFile.writeCrcFile( crcSums, generateFilepathOfCrcFile( imageData ) ); - } catch ( IOException e ) { - // ... and keep it in ram if it fails - crcFile = new CrcFile( crcSums ); - } - image.setCrcFile( crcFile ); + if ( ( image = uploadingImages.get( uuid ) ) == null ) { + // insert new image + if ( crcSums != null && !crcFile.isValid() ) + throw new UploadException( UploadError.INVALID_CRC, "CRC sums were invalid." ); + filepathRelative = generateFilepathOfImage( imageData ); + DbImage.insert( imageData, filepathRelative ); // Make sure it exists in DB + try { + image = new UploadingImage( uuid ); + } catch ( Exception e ) { + throw new UploadException( UploadError.GENERIC_ERROR, "Internal error" ); } - List<Integer> missing = getNMissingBlocks( image, AMOUNT ); - if ( missing.isEmpty() && image.allBlocksValid() ) { - uploadDone( uuid ); - return new UploadInfos( image.getToken(), Globals.getSslSocketPort(), missing, image.allBlocksValid() ); - } - return new UploadInfos( image.getToken(), Globals.getSslSocketPort(), missing, false ); - } - - // insert new image - if ( !CrcFile.sumsAreValid( crcSums ) ) - throw new UploadException( UploadError.INVALID_CRC, "CRC sums were invalid." ); - filepath = generateFilepathOfImage( imageData ); - token = RandomString.generate( 100, false ); - nBlocks = Util.getNumberOfBlocks( imageData.fileSize, Globals.blockSize ); - allBlocks = new int[ nBlocks ]; // initialize array with all zeros which mean that this block is missing - image = new UploadingImage( token, allBlocks, System.currentTimeMillis(), uuid, filepath ); - image.setDbImage( i ); // set the dbImage (it doesn't matter if the image is null because the uploadingImage is creating it then - uploadingImages.put( uuid, image ); - } - - CrcFile crcFile; - try { - // try to write crc file ... - crcFile = CrcFile.writeCrcFile( crcSums, generateFilepathOfCrcFile( imageData ) ); - } catch ( IOException e ) { - // ... and keep it in ram if it fails - crcFile = new CrcFile( crcSums ); - } - image.setCrcFile( crcFile ); - - ConnectionHandler.addConnection( token, filepath, Connection.UPLOADING ).image = image; -// if ( isUpdate ) { -// i.updateVersion( i.imageVersion, Util.getNumberOfBlocks( i.fileSize, Globals.blockSize ) ); -// } else { - DbImage.insert( imageData, System.currentTimeMillis(), token, nBlocks, serverSessionId, filepath ); -// } - imagesToCheck.add( uuid ); - log.debug( imagesToCheck ); - - log.debug( image.toString() ); - return new UploadInfos( token, Globals.getSslSocketPort(), getNMissingBlocks( image, AMOUNT ), false ); - } - - public static DownloadInfos getDownloadInfos( String serverSessionId, String uuid, List<Integer> requestedBlocks ) - { - // check if server is already downloading - if ( downloadingClients.containsKey( serverSessionId ) ) { - DownloadingClient client = downloadingClients.get( serverSessionId ); - - // remove download if done - if ( requestedBlocks.isEmpty() ) - { - downloadDone( serverSessionId, uuid ); - return new DownloadInfos(); - } - - if ( client.isDownloading( uuid ) ) - { - // client was downloading this image - // update the requested blocks - client.requestBlocks( uuid, requestedBlocks ); - return new DownloadInfos( client.getToken( uuid ), Globals.getSslSocketPort() ); + uploadingImages.put( uuid, image ); } - - // server was downloading another image and now gets a new connection for this new download - String token = RandomString.generate( 100, false ); - String filepath = DbImage.getImageByUuid( uuid ).imagePath; - ConnectionHandler.addConnection( token, filepath, Connection.DOWNLOADING ); - - client.addDownload( uuid, requestedBlocks, token ); - downloadingClients.put( serverSessionId, client ); - return new DownloadInfos( token, Globals.getSslSocketPort() ); } - // insert new client and start listener - synchronized ( downloadingClients ) { - String token = RandomString.generate( 100, false ); - String filepath = DbImage.getImageByUuid( uuid ).imagePath; - - DownloadingClient client = new DownloadingClient(); - client.addDownload( uuid, requestedBlocks, token ); - downloadingClients.put( serverSessionId, client ); + final String token = RandomString.generate( 50, false ); - ConnectionHandler.addConnection( token, filepath, Connection.DOWNLOADING ).client = client; - return new DownloadInfos( token, Globals.getSslSocketPort() ); - } + ConnectionHandler.addUpload( token, image ); + // Set crc file on image - if there is already a crc file assigned, this does nothing + image.setCrcFile( crcFile ); + if ( image.allBlocksValid() ) + removeFinishedUploads(); + return new UploadData( token, Globals.getSslSocketPort() ); } - private static void downloadDone( String serverSessionId, String uuid ) + public static DownloadData getDownloadInfos( String uuid ) throws ImageDataException { - synchronized ( downloadingClients ) { - DownloadingClient client = downloadingClients.get( serverSessionId ); - client.removeDownload( uuid ); - ConnectionHandler.removeConnection( client.getToken( uuid ) ); - if ( !client.hasDownloads() ) { - downloadingClients.remove( serverSessionId ); - } - } + DbImage image = DbImage.getImageByUuid( uuid ); + if ( image == null ) + throw new ImageDataException( ImageDataError.UNKNOWN_IMAGE, "UUID '" + uuid + "' does not map to a known image." ); + // server was downloading another image and now gets a new connection for this new download + String token = RandomString.generate( 50, false ); + ConnectionHandler.addDownload( token, image ); + return new DownloadData( token, Globals.getSslSocketPort(), null ); // TODO: Return crc list } /** - * Returns a specified number of missing blocks. - * - * @param imageUUID The image of which you want to get the missing blocks from - * @param amount The amount of blocks that you want to get - * @return The missing blocks - * @throws UploadException If a block was transmitted to many times. + * Go though list of active uploading images and remove + * those that are finished. */ - private static List<Integer> getNMissingBlocks( UploadingImage image, int amount ) throws UploadException + public static void removeFinishedUploads() { - int missing = image.getAmountOfBlocksNeedingRequest(); - log.debug( "The number of missing blocks: " + missing ); - if ( amount > missing ) - amount = missing; - List<Integer> result = new ArrayList<>( amount ); - - int got = 0; - for ( int i = 0; i < image.getNumberOfBlocks(); i++ ) { - if ( image.needsRequest( i ) ) { - int times = image.getTimesTransmitted( i ); - if ( times > Globals.getSslTransmitTimes() ) { - log.debug( "Block " + i + " is probably broken." ); - throw new UploadException( UploadError.BROKEN_BLOCK, "Block " + i + " was transmitted " - + times + " and is still not valid." ); - } - result.add( i ); - got++; + for (Iterator<UploadingImage> it = uploadingImages.values().iterator(); it.hasNext(); ) { + UploadingImage image = it.next(); + if (image.allBlocksValid()) { + synchronized ( imagesToCheck ) { + imagesToCheck.remove( image.getUuid() ); + image.updateMissingBlocks( null ); + } + it.remove(); } - if ( got == amount ) - break; - } - - log.debug( "Returned " + got + " missing blocks." ); - - return result; - } - - /** - * Is triggered when an upload of an image is done. - * Removes image from process list, updates db entry and moves file on hard drive. - * - * @param uuid - */ - private static void uploadDone( String uuid ) - { - synchronized ( imagesToCheck ) { - imagesToCheck.remove( uuid ); - log.debug( "Removing " + uuid ); - } - - UploadingImage image; - synchronized ( uploadingImages ) { - image = uploadingImages.remove( uuid ); } - image.getDbImage().updateMissingBlocks( null ); - // file was already downloaded in the right location by the updownloader class. - // remove the connection so that it can be used by a new client - ConnectionHandler.removeConnection( image.getToken() ); } public static List<UploadingImage> getImagesToCheck() @@ -297,28 +147,28 @@ public class ImageProcessor // TODO return null; } - + /** * Generates the filePath of an image. * And creates the folder if wanted. * The crc file is found under filePath + ".crc" * * @param imageData The data of the image - * @param createFolder If you want the folder to be created * @return The filePath of the given image */ public static String generateFilepathOfImage( ImageData imageData ) { return generateFilePathOfImage( imageData.uuid, imageData.imageName, imageData.imageVersion ); } - - public static String generateFilePathOfImage( String uuid, String imageName, int imageVersion) { - String result = Globals.getImageDir() + "/" + uuid + "/"; - File dir = new File(result); + + public static String generateFilePathOfImage( String uuid, String imageName, int imageVersion ) + { + String result = Util.sanitizeFileName( uuid ) + "/"; + File dir = new File( Globals.getImageDir() + "/" + result ); if ( !dir.exists() ) { dir.mkdirs(); } - result += imageName + "-v" + String.valueOf( imageVersion ) + ".vmdk"; + result += imageName + "-rev" + String.valueOf( imageVersion ); return result; } @@ -326,41 +176,10 @@ public class ImageProcessor { return generateFilepathOfImage( imageData ) + ".crc"; } - + public static String generateFilepathOfCrcFile( String uuid, String imageName, int imageVersion ) { return generateFilePathOfImage( uuid, imageName, imageVersion ) + ".crc"; } - /** - * Checks pending uploads in database and adds them to process list again. - */ - static - { - List<DbImage> list = DbImage.getUploadingImages(); - for ( DbImage image : list ) { - UploadingImage infos = new UploadingImage( image.token, image.blockStatus, image.timestamp.getTime(), image.uuid, image.imagePath ); - ConnectionHandler.addConnection( image.token, image.imagePath, Connection.UPLOADING ).image = infos; - CrcFile crcFile = new CrcFile( generateFilepathOfCrcFile( image.uuid, image.imageName, image.imageVersion ) ); // TODO: has to be adjusted with the corresponding value above - try { - if ( !crcFile.isValid() ) { - continue; - // UploadingImage object will contain a CRCFile = null which invokes the ImageProcessor to retry to save it - } - } catch ( IOException e ) { - continue; - // same thing here - } - infos.setCrcFile( crcFile ); - uploadingImages.put( image.uuid, infos ); - imagesToCheck.add( image.uuid ); - } - log.info( "Added " + list.size() + " pending upload(s) to process list again." ); - } - - - - public static void init() - { - } } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java b/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java index 169877c..78d5f2b 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/UploadingImage.java @@ -1,5 +1,6 @@ package org.openslx.imagemaster.serverconnection; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -10,16 +11,14 @@ import org.openslx.imagemaster.crcchecker.ImageFile; import org.openslx.imagemaster.db.DbImage; /** - * Helper class for ImageProcessor and ConnectionHandler to save some infos about the images in the process list. + * Helper class for ImageProcessor and ConnectionHandler to save some infos about the images in the + * process list. */ public class UploadingImage { public static final Logger log = Logger.getLogger( UploadingImage.class ); - /** - * Token for the satellite. - */ - private String token; + /** * The status list of the blocks. * x = 0 block is missing @@ -27,177 +26,164 @@ public class UploadingImage * x > 0 block is invalid and was transmitted x times (needs request) * x < 0 block is invalid and was transmitted x times (needs check) */ - private int[] blockStatus = null; - public static final int valid = 200; - public static final int missing = 0; - private long timestamp; // when did the server something for the last time + private final int[] blockStatus; + /** + * Remember last position in blockStatus array that was returned, so we don't always + * iterate from the beginning. + */ + private int lastStatusPos = 0; + + public static final int VALID = 200; + public static final int MISSING = 0; + private DbImage dbImage = null; // the DB representation of this image - private String uuid; - private String filename; + /** + * Class for accessing the file (read blocks from it) + */ private ImageFile imageFile = null; private CrcFile crcFile = null; - protected UploadingImage(String token, int[] initialBlockStatus, long timestamp, String uuid, String filename) + protected UploadingImage( String uuid ) { - this.token = token; - this.timestamp = timestamp; - this.uuid = uuid; - this.blockStatus = initialBlockStatus; - this.filename = filename; + this.dbImage = DbImage.getImageByUuid( uuid ); + if ( this.dbImage == null ) + throw new RuntimeException( "Unknown image " + uuid + " on UploadingImage creation" ); + this.blockStatus = this.dbImage.blockStatus; } protected void setValid( int index ) { - if ( blockStatus == null ) - return; synchronized ( blockStatus ) { - blockStatus[index] = valid; + blockStatus[index] = VALID; } } protected void updateDb() { - if ( blockStatus == null ) - return; - List<Integer> missingBlocks = new ArrayList<>(); - synchronized ( blockStatus ) { for ( int block = 0; block < blockStatus.length; block++ ) { - if ( blockStatus[block] != valid ) { + if ( blockStatus[block] != VALID ) { missingBlocks.add( block ); } } } - getDbImage().updateMissingBlocks( missingBlocks ); + dbImage.updateMissingBlocks( missingBlocks ); } protected void setMissing( int index ) { - if ( blockStatus == null ) - return; synchronized ( blockStatus ) { - blockStatus[index] = missing; + blockStatus[index] = MISSING; } } protected void setNeedsRequest( int index ) { - if ( blockStatus == null ) - return; synchronized ( blockStatus ) { - blockStatus[index] *= ( blockStatus[index] < missing ) ? -1 : 1; // switch to positive value if needed + blockStatus[index] = Math.abs( blockStatus[index] ); // switch to positive value if needed } } protected void setNeedsCheck( int index ) { - if ( blockStatus == null ) - return; synchronized ( blockStatus ) { - blockStatus[index] *= ( blockStatus[index] > missing ) ? -1 : 1; // switch to negative value if needed + blockStatus[index] = -Math.abs( blockStatus[index] ); // switch to negative value if needed } } protected void increaseTransmittedTimes( int index ) { - if ( blockStatus == null || blockStatus[index] == 200 ) - return; synchronized ( blockStatus ) { - blockStatus[index] += ( blockStatus[index] <= missing ) ? -1 : 1; // increase in both directions + if ( blockStatus[index] == 200 ) + return; + blockStatus[index] += ( blockStatus[index] <= MISSING ) ? -1 : 1; // increase in both directions } } protected int getTimesTransmitted( int index ) { synchronized ( blockStatus ) { - return ( blockStatus[index] > 0 ) ? blockStatus[index] : ( -1 ) * blockStatus[index]; + return Math.abs( blockStatus[index] ); } } - protected String getToken() - { - return this.token; - } - protected boolean needsRequest( int index ) { - if ( blockStatus == null ) - return false; synchronized ( blockStatus ) { - return ( ( blockStatus[index] >= missing ) && ( blockStatus[index] != valid ) ); + return ( ( blockStatus[index] >= MISSING ) && ( blockStatus[index] != VALID ) ); } } protected boolean needsCheck( int index ) { - if ( blockStatus == null ) - return false; synchronized ( blockStatus ) { - return ( blockStatus[index] < missing ); + return ( blockStatus[index] < MISSING ); } } protected int getNumberOfBlocks() { - /////////////////////////////////////////////////////////////////// -// ArrayList<Integer> l = new ArrayList<Integer>( blockStatus.length ); -// for ( int i : blockStatus ) { -// l.add( i ); -// } -// log.debug( l ); - /////////////////////////////////////////////////////////////////// return blockStatus.length; } - + protected int getNextMissingBlock() { - // TODO: handle intern status of current uploading block. - for (int i = 0; i < blockStatus.length; i++) { - if (blockStatus[i] == 0) - return i; + synchronized ( blockStatus ) { + for ( int i = 0; i < blockStatus.length; i++ ) { + int index = ( i + lastStatusPos ) % blockStatus.length; + if ( blockStatus[index] == MISSING ) + return lastStatusPos = index; + } + for ( int index = 0; index < blockStatus.length; index++ ) { + if ( blockStatus[index] > MISSING && blockStatus[index] < VALID ) + return lastStatusPos = index; + } + for ( int index = 0; index < blockStatus.length; index++ ) { + if ( blockStatus[index] < MISSING ) + return lastStatusPos = index; + } } return -1; } + /* protected long getTimestamp() { return this.timestamp; } - - protected DbImage getDbImage() - { - if ( dbImage == null ) { - dbImage = DbImage.getImageByUuid( this.uuid ); - } - return this.dbImage; - } - - protected void setDbImage( DbImage dbImage ) - { - if ( dbImage != null ) { - return; - } else { - this.dbImage = dbImage; - } - } + */ protected ImageFile getImageFile() { if ( imageFile == null ) { - imageFile = new ImageFile( filename, Globals.blockSize ); + imageFile = new ImageFile( dbImage.getAbsolutePath(), Globals.blockSize ); } return imageFile; } protected CrcFile getCrcFile() { + if ( crcFile == null ) { + try { + crcFile = new CrcFile( dbImage.getAbsolutePath() + ".crc" ); + } catch ( IOException e ) { + // Not found... return null + } + } return crcFile; } protected void setCrcFile( CrcFile crcFile ) { - this.crcFile = crcFile; + if ( getCrcFile() == null ) { + this.crcFile = crcFile; + try { + crcFile.writeCrcFile( dbImage.getAbsolutePath() + ".crc" ); + } catch ( IOException e ) { + log.error( "Could not write crc list to file", e ); + } + } } public int getAmountOfBlocksNeedingRequest() @@ -228,8 +214,28 @@ public class UploadingImage @Override public String toString() { - return "UUID: " + uuid + ", filename " + filename + "\nmissing blocks " + getAmountOfBlocksNeedingRequest() + - ", number of blocks " + getNumberOfBlocks() + ", token " + getToken(); + return "UUID: " + dbImage.uuid + ", filename " + dbImage.imagePath + "\nmissing blocks " + getAmountOfBlocksNeedingRequest() + + ", number of blocks " + getNumberOfBlocks(); + + } + public String getAbsolutePath() + { + return dbImage.getAbsolutePath(); + } + + public long getFileSize() + { + return dbImage.fileSize; + } + + public String getUuid() + { + return dbImage.uuid; + } + + public void updateMissingBlocks( List<Integer> missingBlocks ) + { + dbImage.updateMissingBlocks( missingBlocks ); } } |