From f5618c87e63deb99920710787f6dcd34d4b17425 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 13 Apr 2016 18:41:29 +0200 Subject: (WiP) Global image sync --- .../serverconnection/AbstractTransfer.java | 92 ----------- .../serverconnection/ConnectionHandler.java | 172 ++++++++++----------- .../serverconnection/IncomingTransfer.java | 81 ++++++++++ .../serverconnection/PrioThreadFactory.java | 24 +++ 4 files changed, 185 insertions(+), 184 deletions(-) delete mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java create mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java create mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java (limited to 'src/main/java/org/openslx/imagemaster/serverconnection') diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java deleted file mode 100644 index 3acac5b..0000000 --- a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java +++ /dev/null @@ -1,92 +0,0 @@ -package org.openslx.imagemaster.serverconnection; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -public abstract class AbstractTransfer { - - /** - * How long to keep this transfer information when the transfer is - * (potentially) done - */ - private static final long FINISH_TIMEOUT = TimeUnit.MINUTES.toMillis(5); - - /** - * How long to keep this transfer information when there are no active - * connections and the transfer seems unfinished - */ - private static final long IDLE_TIMEOUT = TimeUnit.HOURS.toMillis(4); - - /** - * Time stamp of when (we think) the transfer finished. Clients can/might - * not tell us they're done, and simply taking "no active connection" as a - * sign the download is done might have unwanted effects if the user's - * connection drops for a minute. If this time stamp (plus a FINISH_TIMEOUT) - * passed, - * we consider the download done and flag it for removal. - * If set to zero, the transfer is not finished, or not assumed to have - * finished. - */ - protected final AtomicLong potentialFinishTime = new AtomicLong(0); - - /** - * Time of last activity on this transfer. - */ - protected final AtomicLong lastActivityTime = new AtomicLong(System.currentTimeMillis()); - - private final String transferId; - - public AbstractTransfer(String transferId) { - this.transferId = transferId; - } - - /** - * Returns true if the transfer is considered completed. - * - * @param now pass System.currentTimeMillis() - * @return true if the transfer is considered completed - */ - public boolean isComplete(long now) { - long val = potentialFinishTime.get(); - return val != 0 && val + FINISH_TIMEOUT < now; - } - - /** - * Returns true if there has been no activity on this transfer for a certain - * amount of time. - * - * @param now pass System.currentTimeMillis() - * @return true if the transfer reached its idle timeout - */ - public final boolean hasReachedIdleTimeout(long now) { - return getActiveConnectionCount() == 0 && lastActivityTime.get() + IDLE_TIMEOUT < now; - } - - public final String getId() { - return transferId; - } - - /** - * Returns true if this transfer would potentially accept new connections. - * This should NOT return false if there are too many concurrent - * connections, as this is used to signal the client whether to keep trying - * to connect. - * - * @return true if this transfer would potentially accept new connections - */ - public abstract boolean isActive(); - - /** - * Cancel this transfer, aborting all active connections and rejecting - * further incoming ones. - */ - public abstract void cancel(); - - /** - * Returns number of active transfer connections. - * - * @return number of active transfer connections - */ - public abstract int getActiveConnectionCount(); - -} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 44c8e16..141e17f 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -1,13 +1,15 @@ package org.openslx.imagemaster.serverconnection; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.KeyStore; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; @@ -19,17 +21,13 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; -import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; +import org.openslx.filetransfer.util.AbstractTransfer; import org.openslx.imagemaster.Globals; -import org.openslx.imagemaster.crcchecker.CrcFile; -import org.openslx.imagemaster.db.mappers.DbOsVirt; -import org.openslx.imagemaster.db.mappers.DbUser; -import org.openslx.imagemaster.util.RandomString; -import org.openslx.imagemaster.util.Util; +import org.openslx.util.GrowingThreadPoolExecutor; /** * Class to handle all incoming and outgoing connections. @@ -38,19 +36,24 @@ import org.openslx.imagemaster.util.Util; public class ConnectionHandler implements IncomingEvent { - private static Logger log = Logger.getLogger( ConnectionHandler.class ); - private static SSLContext sslContext; + private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class ); - private static Map pendingIncomingUploads = new ConcurrentHashMap<>(); - private static Map pendingIncomingDownloads = new ConcurrentHashMap<>(); + private static final int MAX_TRANSFERS = 12; + + private static Map incomingTransfers = new ConcurrentHashMap<>(); + private static Map outgoingTransfers = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); - private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue() ); - private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue() ); + private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES, + new SynchronousQueue(), + new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) ); - private static Listener listener; + private static final Listener plainListener; + private static final Listener sslListener; static { - log.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); + LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); + Listener ssl = null; + Listener plain = null; try { String pathToKeyStore = Globals.getSslKeystoreFile(); char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); @@ -58,94 +61,55 @@ public class ConnectionHandler implements IncomingEvent keystore.load( new FileInputStream( pathToKeyStore ), passphrase ); KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() ); kmf.init( keystore, passphrase ); - sslContext = SSLContext.getInstance( "TLSv1.2" ); + SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" ); KeyManager[] keyManagers = kmf.getKeyManagers(); sslContext.init( keyManagers, null, null ); - listener = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); - listener.start(); + ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); + ssl.start(); + plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 ); + plain.start(); } catch ( Exception e ) { - log.error( "Initialization failed.", e ); + LOGGER.error( "Initialization failed.", e ); System.exit( 2 ); } + sslListener = ssl; + plainListener = plain; } - /** - * Checks if this image is already uploading and returns a new list with missing blocks if so. - * Puts the new image into processing list else. - * - * @param serverSessionId The uploading server - * @param imageData The data of the image - * @return - * @throws UploadException If some error occurred during the process - */ - public static TransferInformation getUploadInfos( ImagePublishData imageData, List crcSums ) + public static IncomingTransfer registerUpload( ImagePublishData img, List blockHashes ) throws TTransferRejectedException, TInvocationException { - // check image data - if ( Util.isEmpty( imageData.imageName ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" ); - if ( !DbUser.exists( imageData.user ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Invalid or missing image owner" ); - if ( DbOsVirt.osExists( imageData.osId ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" ); - if ( DbOsVirt.virtExists( imageData.virtId ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" ); - if ( imageData.fileSize <= 0 ) - throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" ); - - log.debug( "A satellite is submitting " + imageData.imageVersionId ); - - final String uuid = imageData.imageVersionId; - final String filepathRelative; - final CrcFile crcFile; - if ( crcSums == null ) { - crcFile = null; - } else { - crcFile = new CrcFile( crcSums ); - } - ImagePublishData image; - - synchronized ( pendingIncomingUploads ) { - /* - // check if image is already uploading - if ( ( image = uploadingImages.get( uuid ) ) == null ) { - // TODO insert new image to DB - uploadingImages.put( uuid, image ); + IncomingTransfer transfer; + synchronized ( incomingTransfers ) { + transfer = incomingTransfers.get( img.imageVersionId ); + if ( transfer == null ) { + if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { + throw new TTransferRejectedException( "Too many active transfers" ); + } + try { + transfer = new IncomingTransfer( img, blockHashes ); + } catch ( FileNotFoundException e ) { + LOGGER.warn( "Cannot init download", e ); + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" ); + } + incomingTransfers.put( transfer.getId(), transfer ); + incomingTransfers.put( img.imageVersionId, transfer ); } - */ } - - final String token = RandomString.generate( 50, false ); - - // TODO addUpload( token, image ); - // TODO Set crc file on image - if there is already a crc file assigned, this does nothing - return new TransferInformation( token, Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() ); - } - - /** - * Add a new allowed incoming upload connection - * for the given token and image. - * - * @param token The unique token - * @param image Image being uploaded - */ - public static void addUpload( String token, AbstractTransfer image ) - { - pendingIncomingUploads.put( token, image ); - log.debug( "Added upload" ); + return transfer; } - /** - * Add a new allowed incoming download connection - * for the given token and image. - * - * @param token The unique token - * @param image Image being uploaded - */ - public static void addDownload( String token, AbstractTransfer image ) + public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List crcSums ) + throws TTransferRejectedException { - pendingIncomingDownloads.put( token, image ); - log.debug( "Added download" ); + IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId ); + if ( transfer == null ) + return null; + if ( transfer.getFileSize() != imageData.fileSize ) + throw new TTransferRejectedException( "File size mismatch" ); + if ( !transfer.hashesEqual( crcSums ) ) + throw new TTransferRejectedException( "Block hashes mismatch" ); + return transfer; } /** @@ -165,8 +129,32 @@ public class ConnectionHandler implements IncomingEvent @Override public void incomingUploadRequest( final Downloader downloader ) throws IOException { - // TODO - downloader.sendErrorCode( "Too many concurrent downloads." ); - downloader.cancel(); + IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() ); + if ( transfer == null ) { + downloader.sendErrorCode( "Unknown upload token." ); + downloader.cancel(); + return; + } + if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { + downloader.sendErrorCode( "Too many concurrent uploads." ); + downloader.cancel(); + return; + } + if ( !transfer.addConnection( downloader, transferPool ) ) { + downloader.cancel(); + } } + + public static int getUploadConnectionCount() + { + final long now = System.currentTimeMillis(); + int active = 0; + for ( IncomingTransfer t : incomingTransfers.values() ) { + if ( t.countsTowardsConnectionLimit( now ) ) { + active += t.getActiveConnectionCount(); + } + } + return active; + } + } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java new file mode 100644 index 0000000..bfc65e1 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java @@ -0,0 +1,81 @@ +package org.openslx.imagemaster.serverconnection; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; + +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.TInvocationException; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.filetransfer.util.ChunkStatus; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.filetransfer.util.IncomingTransferBase; +import org.openslx.imagemaster.Globals; +import org.openslx.imagemaster.db.mappers.DbImageBlock; +import org.openslx.imagemaster.util.Util; +import org.openslx.util.ThriftUtil; + +public class IncomingTransfer extends IncomingTransferBase +{ + + private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * 10; + + private final String imageVersionId; + + public IncomingTransfer( ImagePublishData img, List blockHashes ) + throws TInvocationException, FileNotFoundException + { + super( UUID.randomUUID().toString(), new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId ), + img.fileSize, ThriftUtil.unwrapByteBufferList( blockHashes ) ); + this.imageVersionId = img.imageVersionId; + } + + @Override + public String getRelativePath() + { + return Util.getRelativePath( getTmpFileName(), new File( Globals.getImageDir() ) ); + } + + @Override + public synchronized void cancel() + { + super.cancel(); + getTmpFileName().delete(); + } + + @Override + protected boolean hasEnoughFreeSpace() + { + long space = Globals.getImagePath().getUsableSpace(); + return space > MIN_FREE_SPACE_BYTES; + } + + @Override + protected boolean finishIncomingTransfer() + { + potentialFinishTime.set( System.currentTimeMillis() ); + return true; + } + + @Override + public TransferInformation getTransferInfo() + { + return new TransferInformation( getId(), Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() ); + } + + @Override + protected void chunkStatusChanged( FileChunk chunk ) + { + ChunkStatus status = chunk.getStatus(); + if ( status == ChunkStatus.MISSING || status == ChunkStatus.COMPLETE ) { + try { + DbImageBlock.asyncUpdate( imageVersionId, chunk ); + } catch ( InterruptedException e ) { + e.printStackTrace(); + } + } + } + +} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java new file mode 100644 index 0000000..5fa9da4 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java @@ -0,0 +1,24 @@ +package org.openslx.imagemaster.serverconnection; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class PrioThreadFactory implements ThreadFactory { + + private final AtomicInteger counter = new AtomicInteger(); + private final String name; + private final int priority; + + public PrioThreadFactory(String name, int priority) { + this.name = name; + this.priority = priority; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, name + "-" + counter.incrementAndGet()); + thread.setPriority(priority); + return thread; + } + +} -- cgit v1.2.3-55-g7522