summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java51
1 files changed, 34 insertions, 17 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 8a69020..8e68dc2 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -14,7 +14,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.filetransfer.DataReceivedCallback;
@@ -30,7 +31,7 @@ import org.openslx.util.ThriftUtil;
public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback
{
- private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class );
+ private static final Logger LOGGER = LogManager.getLogger( IncomingTransferBase.class );
/**
* Remote peer is uploading, so on our end, we have Downloaders
@@ -153,7 +154,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
@Override
public final int getActiveConnectionCount()
{
- return downloads.size();
+ synchronized ( downloads ) {
+ return downloads.size();
+ }
}
public final boolean hashesEqual( List<ByteBuffer> blockHashes )
@@ -215,11 +218,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
public void updateBlockHashList( List<byte[]> hashList )
{
if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
- LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state );
+ LOGGER.info( this.getId() + ": Rejecting block hash list in state " + state );
return;
}
if ( hashList == null ) {
- LOGGER.debug( this.getId() + ": Rejecting null block hash list" );
+ LOGGER.info( this.getId() + ": Rejecting null block hash list" );
return;
}
int firstNew = chunks.updateSha1Sums( hashList );
@@ -286,6 +289,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( sources != null && !sources.isEmpty() ) {
chunks.markLocalCopyCandidates( sources );
}
+ if ( state == TransferState.IDLE ) {
+ state = TransferState.WORKING;
+ }
localCopyManager.trigger();
}
@@ -389,7 +395,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
currentChunk = chunks.getMissing();
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
- cancel();
+ LOGGER.info( "Incoming transfer connection was interrupted" );
return null;
}
if ( currentChunk == null ) {
@@ -437,7 +443,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
// We have no hash checker, or hasher rejected block,
// or the hash for the current chunk is unknown - flush to disk
+ long pre = System.currentTimeMillis();
writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 2000 ) {
+ LOGGER.warn( "Writing chunk to disk before hash check took " + duration + "ms. Storage backend overloaded?" );
+ }
chunks.markCompleted( currentChunk, false );
chunkStatusChanged( currentChunk );
if ( passEx != null )
@@ -463,6 +474,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
@Override
public void run()
{
+ int active;
try {
CbHandler cbh = new CbHandler( connection );
if ( connection.download( cbh, cbh ) ) {
@@ -481,7 +493,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
chunkStatusChanged( cbh.currentChunk );
}
- LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
+ LOGGER.info( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped prematurely" );
}
if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
lastActivityTime.set( System.currentTimeMillis() );
@@ -489,6 +501,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
} finally {
synchronized ( downloads ) {
downloads.remove( connection );
+ active = downloads.size();
}
}
if ( chunks.isComplete() ) {
@@ -499,6 +512,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( localCopyManager != null ) {
localCopyManager.trigger();
}
+ LOGGER.info( "Downloader disconnected, " + active + " still running. " + chunks.getStats() );
+ } else {
+ LOGGER.info( "Downloader disconnected, state=" + state + ". " + chunks.getStats() );
}
}
} );
@@ -563,7 +579,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk )
{
if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
- LOGGER.debug( "hashCheckDone called in bad state " + state.name() );
+ LOGGER.warn( "hashCheckDone called in bad state " + state.name() );
return;
}
switch ( result ) {
@@ -576,7 +592,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.markCompleted( chunk, true );
} else {
try {
+ long pre = System.currentTimeMillis();
writeFileData( chunk.range.startOffset, chunk.range.getLength(), data );
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 2000 ) {
+ LOGGER.warn( "Writing chunk to disk after hash check took " + duration + "ms. Storage backend overloaded?" );
+ }
chunks.markCompleted( chunk, true );
} catch ( Exception e ) {
LOGGER.warn( "Cannot write to file after hash check", e );
@@ -600,7 +621,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
// A block finished, see if we can queue a new one
queueUnhashedChunk( false );
- if ( localCopyManager != null ) {
+ if ( localCopyManager != null && localCopyManager.isAlive() ) {
localCopyManager.trigger();
}
}
@@ -617,7 +638,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
try {
data = loadChunkFromFile( chunk );
} catch ( EOFException e1 ) {
- LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." );
+ LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking as invalid." );
chunks.markFailed( chunk );
chunkStatusChanged( chunk );
return;
@@ -645,7 +666,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
final synchronized void finishUploadInternal()
{
- if ( state == TransferState.FINISHED ) {
+ if ( state == TransferState.FINISHED || state == TransferState.ERROR ) {
return;
}
try {
@@ -659,13 +680,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( localCopyManager != null ) {
localCopyManager.interrupt();
}
- if ( state != TransferState.WORKING ) {
+ state = TransferState.FINISHED; // Races...
+ if ( !finishIncomingTransfer() ) {
state = TransferState.ERROR;
- } else {
- state = TransferState.FINISHED; // Races...
- if ( !finishIncomingTransfer() ) {
- state = TransferState.ERROR;
- }
}
}