diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 8a69020..8e68dc2 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -14,7 +14,8 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.filetransfer.DataReceivedCallback; @@ -30,7 +31,7 @@ import org.openslx.util.ThriftUtil; public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback { - private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class ); + private static final Logger LOGGER = LogManager.getLogger( IncomingTransferBase.class ); /** * Remote peer is uploading, so on our end, we have Downloaders @@ -153,7 +154,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H @Override public final int getActiveConnectionCount() { - return downloads.size(); + synchronized ( downloads ) { + return downloads.size(); + } } public final boolean hashesEqual( List<ByteBuffer> blockHashes ) @@ -215,11 +218,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H public void updateBlockHashList( List<byte[]> hashList ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { - LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state ); + LOGGER.info( this.getId() + ": Rejecting block hash list in state " + state ); return; } if ( hashList == null ) { - LOGGER.debug( this.getId() + ": Rejecting null block hash list" ); + LOGGER.info( this.getId() + ": Rejecting null block hash list" ); return; } int firstNew = chunks.updateSha1Sums( hashList ); @@ -286,6 +289,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( sources != null && !sources.isEmpty() ) { chunks.markLocalCopyCandidates( sources ); } + if ( state == TransferState.IDLE ) { + state = TransferState.WORKING; + } localCopyManager.trigger(); } @@ -389,7 +395,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H currentChunk = chunks.getMissing(); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); - cancel(); + LOGGER.info( "Incoming transfer connection was interrupted" ); return null; } if ( currentChunk == null ) { @@ -437,7 +443,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } // We have no hash checker, or hasher rejected block, // or the hash for the current chunk is unknown - flush to disk + long pre = System.currentTimeMillis(); writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); + long duration = System.currentTimeMillis() - pre; + if ( duration > 2000 ) { + LOGGER.warn( "Writing chunk to disk before hash check took " + duration + "ms. Storage backend overloaded?" ); + } chunks.markCompleted( currentChunk, false ); chunkStatusChanged( currentChunk ); if ( passEx != null ) @@ -463,6 +474,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H @Override public void run() { + int active; try { CbHandler cbh = new CbHandler( connection ); if ( connection.download( cbh, cbh ) ) { @@ -481,7 +493,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } chunkStatusChanged( cbh.currentChunk ); } - LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); + LOGGER.info( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped prematurely" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); @@ -489,6 +501,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } finally { synchronized ( downloads ) { downloads.remove( connection ); + active = downloads.size(); } } if ( chunks.isComplete() ) { @@ -499,6 +512,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( localCopyManager != null ) { localCopyManager.trigger(); } + LOGGER.info( "Downloader disconnected, " + active + " still running. " + chunks.getStats() ); + } else { + LOGGER.info( "Downloader disconnected, state=" + state + ". " + chunks.getStats() ); } } } ); @@ -563,7 +579,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { - LOGGER.debug( "hashCheckDone called in bad state " + state.name() ); + LOGGER.warn( "hashCheckDone called in bad state " + state.name() ); return; } switch ( result ) { @@ -576,7 +592,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markCompleted( chunk, true ); } else { try { + long pre = System.currentTimeMillis(); writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); + long duration = System.currentTimeMillis() - pre; + if ( duration > 2000 ) { + LOGGER.warn( "Writing chunk to disk after hash check took " + duration + "ms. Storage backend overloaded?" ); + } chunks.markCompleted( chunk, true ); } catch ( Exception e ) { LOGGER.warn( "Cannot write to file after hash check", e ); @@ -600,7 +621,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } // A block finished, see if we can queue a new one queueUnhashedChunk( false ); - if ( localCopyManager != null ) { + if ( localCopyManager != null && localCopyManager.isAlive() ) { localCopyManager.trigger(); } } @@ -617,7 +638,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { - LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." ); + LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking as invalid." ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); return; @@ -645,7 +666,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H final synchronized void finishUploadInternal() { - if ( state == TransferState.FINISHED ) { + if ( state == TransferState.FINISHED || state == TransferState.ERROR ) { return; } try { @@ -659,13 +680,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( localCopyManager != null ) { localCopyManager.interrupt(); } - if ( state != TransferState.WORKING ) { + state = TransferState.FINISHED; // Races... + if ( !finishIncomingTransfer() ) { state = TransferState.ERROR; - } else { - state = TransferState.FINISHED; // Races... - if ( !finishIncomingTransfer() ) { - state = TransferState.ERROR; - } } } |