diff options
author | Simon Rettberg | 2016-08-30 18:02:43 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-08-30 18:02:43 +0200 |
commit | f45886abed5f04728561d5c8f97423a8036806fc (patch) | |
tree | 1639d97cb76658925aaf4ddc19b080b8ea273e53 /src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java | |
parent | (WiP) Global image sync (diff) | |
download | masterserver-f45886abed5f04728561d5c8f97423a8036806fc.tar.gz masterserver-f45886abed5f04728561d5c8f97423a8036806fc.tar.xz masterserver-f45886abed5f04728561d5c8f97423a8036806fc.zip |
Implement global image exchange with satellite servers
Diffstat (limited to 'src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java')
-rw-r--r-- | src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java | 167 |
1 files changed, 150 insertions, 17 deletions
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 141e17f..f32c655 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -1,10 +1,13 @@ package org.openslx.imagemaster.serverconnection; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.security.KeyStore; +import java.sql.SQLException; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -21,13 +24,17 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; +import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; -import org.openslx.filetransfer.util.AbstractTransfer; import org.openslx.imagemaster.Globals; +import org.openslx.imagemaster.db.mappers.DbImage; +import org.openslx.thrifthelper.ImagePublishDataEx; import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; /** * Class to handle all incoming and outgoing connections. @@ -40,8 +47,10 @@ public class ConnectionHandler implements IncomingEvent private static final int MAX_TRANSFERS = 12; - private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>(); - private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>(); + private static Map<String, IncomingTransfer> incomingTransfersByTransferId = new ConcurrentHashMap<>(); + private static final Map<String, IncomingTransfer> incomingTransfersByVersionId = new ConcurrentHashMap<>(); + + private static Map<String, OutgoingTransfer> outgoingTransfers = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), @@ -51,7 +60,7 @@ public class ConnectionHandler implements IncomingEvent private static final Listener sslListener; static { - LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); + LOGGER.debug( "Starting BFTP on port " + Globals.getFiletransferPortSsl() + "+ and " + Globals.getFiletransferPortPlain() ); Listener ssl = null; Listener plain = null; try { @@ -68,32 +77,89 @@ public class ConnectionHandler implements IncomingEvent ssl.start(); plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 ); plain.start(); + // TODO: Bail out/retry if failed, getters for ports } catch ( Exception e ) { LOGGER.error( "Initialization failed.", e ); System.exit( 2 ); } sslListener = ssl; plainListener = plain; + QuickTimer.scheduleAtFixedDelay( new Task() { + + @Override + public void fire() + { + long now = System.currentTimeMillis(); + for ( Iterator<IncomingTransfer> it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) { + IncomingTransfer t = it.next(); + if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) { + LOGGER.debug( "Removing transfer " + t.getId() ); + it.remove(); + } + } + for ( Iterator<IncomingTransfer> it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) { + IncomingTransfer t = it.next(); + if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) { + it.remove(); + } + } + } + }, 10000, 301000 ); + } + + public static int getSslPort() + { + if ( sslListener.isRunning() ) + return sslListener.getPort(); + return 0; + } + + public static int getPlainPort() + { + if ( plainListener.isRunning() ) + return plainListener.getPort(); + return 0; } - public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes ) + /** + * Register new incoming transfer from a satellite server. + * + * @param img the desired meta data for the new image/version, as supplied by the satellite + * @param blockHashes list of block hashes for the image + * @param existing OPTIONAL if the image version already exists on the server, this is given so + * we can repair/complete the local file, if necessary + * @return The assigned transfer object + */ + public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes, ImagePublishDataEx existing ) throws TTransferRejectedException, TInvocationException { IncomingTransfer transfer; - synchronized ( incomingTransfers ) { - transfer = incomingTransfers.get( img.imageVersionId ); + synchronized ( incomingTransfersByTransferId ) { + transfer = incomingTransfersByVersionId.get( img.imageVersionId ); if ( transfer == null ) { if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { throw new TTransferRejectedException( "Too many active transfers" ); } + File absDestination; + if ( existing == null ) { + absDestination = new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId ); + } else { + absDestination = new File( Globals.getImageDir(), existing.exImagePath ); + } + plainListener.start(); + sslListener.start(); try { - transfer = new IncomingTransfer( img, blockHashes ); + transfer = new IncomingTransfer( img, blockHashes, absDestination, getPlainPort(), getSslPort() ); } catch ( FileNotFoundException e ) { - LOGGER.warn( "Cannot init download", e ); + LOGGER.warn( "Cannot init download to " + absDestination.toString(), e ); throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" ); } - incomingTransfers.put( transfer.getId(), transfer ); - incomingTransfers.put( img.imageVersionId, transfer ); + LOGGER.info( "New incoming upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" ); + incomingTransfersByTransferId.put( transfer.getId(), transfer ); + incomingTransfersByVersionId.put( img.imageVersionId, transfer ); + } else { + LOGGER.info( "Another request for existing upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + + ")" ); } } return transfer; @@ -102,7 +168,7 @@ public class ConnectionHandler implements IncomingEvent public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums ) throws TTransferRejectedException { - IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId ); + IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId ); if ( transfer == null ) return null; if ( transfer.getFileSize() != imageData.fileSize ) @@ -112,15 +178,31 @@ public class ConnectionHandler implements IncomingEvent return transfer; } + public static IncomingTransfer getUploadByToken( String uploadToken ) + { + return incomingTransfersByTransferId.get( uploadToken ); + } + /** * Server is uploading - client is downloading! */ @Override public void incomingDownloadRequest( final Uploader uploader ) { - // TODO - uploader.sendErrorCode( "Too many concurrent uploads." ); - uploader.cancel(); + OutgoingTransfer transfer = outgoingTransfers.get( uploader.getToken() ); + if ( transfer == null ) { + LOGGER.debug( "Unknown download token received" ); + uploader.sendErrorCode( "Unknown download token." ); + uploader.cancel(); + return; + } + if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) { + uploader.sendErrorCode( "Too many concurrent uploads." ); + uploader.cancel(); + } + if ( !transfer.addConnection( uploader, transferPool ) ) { + uploader.cancel(); + } } /** @@ -129,7 +211,7 @@ public class ConnectionHandler implements IncomingEvent @Override public void incomingUploadRequest( final Downloader downloader ) throws IOException { - IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() ); + IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() ); if ( transfer == null ) { downloader.sendErrorCode( "Unknown upload token." ); downloader.cancel(); @@ -149,7 +231,7 @@ public class ConnectionHandler implements IncomingEvent { final long now = System.currentTimeMillis(); int active = 0; - for ( IncomingTransfer t : incomingTransfers.values() ) { + for ( IncomingTransfer t : incomingTransfersByTransferId.values() ) { if ( t.countsTowardsConnectionLimit( now ) ) { active += t.getActiveConnectionCount(); } @@ -157,4 +239,55 @@ public class ConnectionHandler implements IncomingEvent return active; } + public static int getDownloadConnectionCount() + { + final long now = System.currentTimeMillis(); + int active = 0; + for ( OutgoingTransfer t : outgoingTransfers.values() ) { + if ( t.countsTowardsConnectionLimit( now ) ) { + active += t.getActiveConnectionCount(); + } + } + return active; + } + + public static void removeUpload( IncomingTransfer transfer ) + { + incomingTransfersByTransferId.remove( transfer.getId() ); + incomingTransfersByVersionId.remove( transfer.getImageVersionId() ); + } + + public static TransferInformation registerDownload( ImagePublishDataEx img ) throws TTransferRejectedException, TInvocationException + { + OutgoingTransfer transfer; + File absSource; + absSource = new File( Globals.getImageDir(), img.exImagePath ); + if ( !absSource.exists() ) { + LOGGER.error( absSource.toString() + " missing!" ); + try { + DbImage.markValid( img.imageVersionId, false ); + } catch ( SQLException e ) { + } + throw new TTransferRejectedException( "File missing on server" ); + } + if ( absSource.length() != img.fileSize ) { + LOGGER.error( absSource.toString() + " has unexpected size (is: " + absSource.length() + ", should: " + img.fileSize + ")" ); + try { + DbImage.markValid( img.imageVersionId, false ); + } catch ( SQLException e ) { + } + throw new TTransferRejectedException( "File corrupted on server" ); + } + synchronized ( outgoingTransfers ) { + if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) { + throw new TTransferRejectedException( "Too many active transfers" ); + } + plainListener.start(); + sslListener.start(); + transfer = new OutgoingTransfer( absSource, getPlainPort(), getSslPort() ); + outgoingTransfers.put( transfer.getId(), transfer ); + } + return transfer.getTransferInfo(); + } + } |