From ecd3d22510aa2f1aa0c44cee015bd690d19f45ce Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 20 Apr 2016 17:10:14 +0200 Subject: More imgsync stuff --- .../bwlp/thrift/iface/ImagePublishData.java | 125 ++++++++++++++++++++- .../java/org/openslx/filetransfer/Listener.java | 86 +++++++++----- .../org/openslx/filetransfer/util/ChunkList.java | 4 +- .../org/openslx/filetransfer/util/HashChecker.java | 2 - .../filetransfer/util/IncomingTransferBase.java | 36 +++--- .../filetransfer/util/OutgoingTransferBase.java | 121 ++++++++++++++++++++ .../openslx/thrifthelper/ImagePublishDataEx.java | 1 + src/main/thrift/bwlp.thrift | 1 + 8 files changed, 323 insertions(+), 53 deletions(-) create mode 100644 src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java (limited to 'src') diff --git a/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java b/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java index f04fd6b..9226df6 100644 --- a/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java +++ b/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-04-18") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-04-20") public class ImagePublishData implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ImagePublishData"); @@ -51,6 +51,7 @@ public class ImagePublishData implements org.apache.thrift.TBase, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -71,6 +72,7 @@ public class ImagePublishData implements org.apache.thrift.TBase byName = new HashMap(); @@ -127,6 +130,8 @@ public class ImagePublishData implements org.apache.thrift.TBase() ); - private static final byte U = 85; // hex - code 'U' = 85. - private static final byte D = 68; // hex - code 'D' = 68. + private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85. + private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68. private static Logger log = Logger.getLogger( Listener.class ); /***********************************************************************/ @@ -48,8 +54,10 @@ public class Listener * connection, and start Downloader or Uploader. * */ - private boolean listen() + private synchronized boolean listen() { + if ( listenSocket != null ) + return true; try { if ( this.context == null ) { listenSocket = new ServerSocket(); @@ -61,61 +69,81 @@ public class Listener listenSocket.bind( new InetSocketAddress( this.port ) ); } catch ( Exception e ) { log.error( "Cannot listen on port " + this.port, e ); + listenSocket = null; return false; } return true; } - private void run() + private synchronized void run() { + if ( acceptThread != null ) + return; final Listener instance = this; acceptThread = new Thread( "BFTP-Listen-" + this.port ) { @Override public void run() { try { + // Run accept loop in own thread while ( !isInterrupted() ) { - Socket connectionSocket = null; + Socket acceptedSocket = null; try { - connectionSocket = listenSocket.accept(); + acceptedSocket = listenSocket.accept(); } catch ( SocketTimeoutException e ) { continue; } catch ( Exception e ) { log.warn( "Some exception when accepting! Trying to resume...", e ); Transfer.safeClose( listenSocket ); + listenSocket = null; if ( !listen() ) { log.error( "Could not re-open listening socket" ); break; } continue; } - try { - connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections + // Handle each accepted connection in a thread pool + final Socket connection = acceptedSocket; + Runnable handler = new Runnable() { + @Override + public void run() + { - byte[] b = new byte[ 1 ]; - int length = connectionSocket.getInputStream().read( b ); - if ( length == -1 ) - continue; + try { + // Give initial byte signalling mode of operation 5 secs to arrive + connection.setSoTimeout( 5000 ); - connectionSocket.setSoTimeout( readTimeoutMs ); + byte[] b = new byte[ 1 ]; + int length = connection.getInputStream().read( b ); + if ( length == -1 ) { + Transfer.safeClose( connection ); + return; + } + // Byte arrived, now set desired timeout + connection.setSoTimeout( readTimeoutMs ); - if ( b[0] == U ) { - // --> start Downloader(socket). - Downloader d = new Downloader( connectionSocket ); - incomingEvent.incomingUploadRequest( d ); - } - else if ( b[0] == D ) { - // --> start Uploader(socket). - Uploader u = new Uploader( connectionSocket ); - incomingEvent.incomingDownloadRequest( u ); - } - else { - log.debug( "Got invalid init-byte ... close connection" ); - connectionSocket.close(); + if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) { + // --> start Downloader(socket). + Downloader d = new Downloader( connection ); + incomingEvent.incomingUploadRequest( d ); + } else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) { + // --> start Uploader(socket). + Uploader u = new Uploader( connection ); + incomingEvent.incomingDownloadRequest( u ); + } else { + log.debug( "Got invalid init-byte ... close connection" ); + Transfer.safeClose( connection ); + } + } catch ( Exception e ) { + log.warn( "Error accepting client", e ); + Transfer.safeClose( connection ); + } } - } catch ( Exception e ) { - log.warn( "Error accepting client", e ); - Transfer.safeClose( connectionSocket ); + }; + try { + processingPool.execute( handler ); + } catch ( RejectedExecutionException e ) { + Transfer.safeClose( acceptedSocket ); } } } finally { diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 372b082..c497be0 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -174,14 +174,14 @@ public class ChunkList * * @param c The chunk in question */ - public synchronized void markSuccessful( FileChunk c ) + public synchronized void markCompleted( FileChunk c, boolean hashCheckSuccessful ) { if ( !pendingChunks.remove( c ) ) { LOGGER.warn( "Inconsistent state: markSuccessful called for Chunk " + c.toString() + ", but chunk is not marked as currently transferring!" ); return; } - c.setStatus( ChunkStatus.COMPLETE ); + c.setStatus( ( hashCheckSuccessful || c.getSha1Sum() == null ) ? ChunkStatus.COMPLETE : ChunkStatus.HASHING ); completeChunks.add( c ); this.notifyAll(); } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index d9db7df..5fdf582 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -105,13 +105,11 @@ public class HashChecker } } } - ChunkStatus old = chunk.getStatus(); chunk.setStatus( ChunkStatus.HASHING ); if ( blocking ) { queue.put( task ); } else { if ( !queue.offer( task ) ) { - chunk.setStatus( old ); return false; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 04ddc17..c2d8ee9 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -155,8 +155,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return state; } - public synchronized TransferStatus getStatus() { - return new TransferStatus(chunks.getStatusArray(), getState()); + public synchronized TransferStatus getStatus() + { + return new TransferStatus( chunks.getStatusArray(), getState() ); } public final ChunkList getChunks() @@ -186,9 +187,10 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.updateSha1Sums( hashList ); if ( hashChecker == null ) return; - FileChunk chunk; - int cnt = 0; - while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) { + for ( int cnt = 0; cnt < 3; ++cnt ) { + FileChunk chunk = chunks.getUnhashedComplete(); + if ( chunk == null ) + break; byte[] data; try { data = loadChunkFromFile( chunk ); @@ -200,14 +202,18 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( data == null ) { LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" ); - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); continue; } try { - if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop + if ( !hashChecker.queue( chunk, data, this, false ) ) { // false == queue full, stop + chunks.markCompleted( chunk, false ); break; + } } catch ( InterruptedException e ) { + LOGGER.debug( "updateBlockHashList got interrupted" ); + chunks.markCompleted( chunk, false ); Thread.currentThread().interrupt(); return; } @@ -304,7 +310,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } else { // We have no hash checker or the hash for the current chunk is unknown - flush to disk writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); - chunks.markSuccessful( currentChunk ); + chunks.markCompleted( currentChunk, true ); chunkStatusChanged( currentChunk ); } } @@ -356,7 +362,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markFailed( cbh.currentChunk ); chunkStatusChanged( cbh.currentChunk ); } - LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" ); + LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); @@ -368,7 +374,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H finishUploadInternal(); } else { // Keep pumping unhashed chunks into the hasher - queueUnhashedChunk(); + queueUnhashedChunk( true ); } } } ); @@ -443,7 +449,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( !chunk.isWrittenToDisk() ) { writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); } - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); if ( chunks.isComplete() ) { finishUploadInternal(); @@ -457,13 +463,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H break; } // A block finished, see if we can queue a new one - queueUnhashedChunk(); + queueUnhashedChunk( false ); } /** * Gets an unhashed chunk (if existent) and queues it for hashing */ - protected void queueUnhashedChunk() + protected void queueUnhashedChunk( boolean blocking ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) @@ -479,12 +485,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( data == null ) { LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" ); - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); return; } try { - hashChecker.queue( chunk, data, this, true ); + hashChecker.queue( chunk, data, this, blocking ); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java new file mode 100644 index 0000000..12deddb --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java @@ -0,0 +1,121 @@ +package org.openslx.filetransfer.util; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.filetransfer.Uploader; + +public abstract class OutgoingTransferBase extends AbstractTransfer +{ + + /* + * Constants + */ + + private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class ); + + private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 ); + + /* + * Overridable constants + */ + + protected static int MAX_CONNECTIONS_PER_TRANSFER = 2; + + /* + * Class members + */ + + /** + * Remote peer is downloading, so we have Uploaders + */ + private final List uploads = new ArrayList<>(); + + /** + * File being uploaded + */ + private final File sourceFile; + + private final TransferInformation transferInformation; + + public OutgoingTransferBase( String transferId, File sourceFile, int plainPort, int sslPort ) + { + super( transferId ); + this.sourceFile = sourceFile; + this.transferInformation = new TransferInformation( transferId, plainPort, sslPort ); + } + + /** + * Add another connection for this file transfer. + * + * @param connection + * @return true if the connection is accepted, false if it should be + * discarded + */ + public synchronized boolean addConnection( final Uploader connection, ExecutorService pool ) + { + synchronized ( uploads ) { + if ( uploads.size() >= MAX_CONNECTIONS_PER_TRANSFER ) + return false; + uploads.add( connection ); + } + return runConnectionInternal( connection, pool ); + } + + protected boolean runConnectionInternal( final Uploader connection, ExecutorService pool ) + { + try { + pool.execute( new Runnable() { + @Override + public void run() + { + boolean ret = connection.upload( sourceFile.getAbsolutePath() ); + synchronized ( uploads ) { + uploads.remove( connection ); + } + if ( ret && uploads.isEmpty() && potentialFinishTime.get() == 0 ) { + potentialFinishTime.set( System.currentTimeMillis() ); + } + lastActivityTime.set( System.currentTimeMillis() ); + } + } ); + } catch ( Exception e ) { + LOGGER.warn( "threadpool rejected the incoming file transfer", e ); + synchronized ( uploads ) { + uploads.remove( connection ); + } + return false; + } + return true; + } + + @Override + public TransferInformation getTransferInfo() + { + return transferInformation; + } + + @Override + public final boolean isActive() + { + return uploads.size() > 0 || lastActivityTime.get() + INACTIVITY_TIMEOUT > System.currentTimeMillis(); + } + + @Override + public void cancel() + { + // Void + } + + @Override + public final int getActiveConnectionCount() + { + return uploads.size(); + } + +} diff --git a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java index 39accef..5cf0a10 100644 --- a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java +++ b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java @@ -6,4 +6,5 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData; public class ImagePublishDataEx extends ImagePublishData { public String exImagePath; + public boolean exIsValid; } diff --git a/src/main/thrift/bwlp.thrift b/src/main/thrift/bwlp.thrift index b0e866a..1389008 100644 --- a/src/main/thrift/bwlp.thrift +++ b/src/main/thrift/bwlp.thrift @@ -228,6 +228,7 @@ struct ImagePublishData { 11: string virtId, 12: bool isTemplate, 13: UserInfo owner, + 14: binary machineDescription, } struct NetRule { -- cgit v1.2.3-55-g7522