summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java57
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java5
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java65
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java59
-rw-r--r--src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java5
-rw-r--r--src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java15
6 files changed, 147 insertions, 59 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index 91d6f1e..27f8e8c 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -10,14 +11,15 @@ import java.util.LinkedList;
import java.util.List;
import java.util.zip.CRC32;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.util.ThriftUtil;
public class ChunkList
{
- private static final Logger LOGGER = Logger.getLogger( ChunkList.class );
+ private static final Logger LOGGER = LogManager.getLogger( ChunkList.class );
/**
* Here we keep a list of all chunks in the proper order, in case we quickly need to access one
@@ -91,7 +93,7 @@ public class ChunkList
* Get CRC32 list in DNBD3 format. All checksums are little
* endian and prefixed by the crc32 sum of the list itself.
*/
- public synchronized byte[] getDnbd3Crc32List() throws IOException
+ public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException
{
byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master
long nextChunkOffset = 0;
@@ -142,7 +144,6 @@ public class ChunkList
* Returns true if this list contains a chunk with state MISSING,
* which means the chunk doesn't have a sha1 known to exist in
* another image.
- * @return
*/
public synchronized boolean hasLocallyMissingChunk()
{
@@ -204,7 +205,7 @@ public class ChunkList
missingChunks.addAll( append );
}
} catch ( Exception e ) {
- LOGGER.warn( "chunk clone list if messed up", e );
+ LOGGER.warn( "chunk clone list is messed up", e );
}
}
}
@@ -414,6 +415,35 @@ public class ChunkList
return sb.toString();
}
+ public synchronized String getStats()
+ {
+ int complete = 0, copying = 0, hashing = 0, missing = 0, qfc = 0, uploading = 0;
+ for ( FileChunk chunk : allChunks ) {
+ switch ( chunk.status ) {
+ case COMPLETE:
+ complete++;
+ break;
+ case COPYING:
+ copying++;
+ break;
+ case HASHING:
+ hashing++;
+ break;
+ case MISSING:
+ missing++;
+ break;
+ case QUEUED_FOR_COPY:
+ qfc++;
+ break;
+ case UPLOADING:
+ uploading++;
+ break;
+ }
+ }
+ return "(" + allChunks.size() + ":" + completeChunks.size() + "/" + pendingChunks.size() + "/" + missingChunks.size() + ")"
+ + " (" + complete + "/" + copying + "/" + hashing + "/" + missing + "/" + qfc + "/" + uploading + ")";
+ }
+
public synchronized boolean isEmpty()
{
return allChunks.isEmpty();
@@ -491,4 +521,21 @@ public class ChunkList
return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum );
}
+ /**
+ * Write DNBD3 CRC32 list to given file.
+ *
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException
+ {
+ byte[] dnbd3Crc32List = null;
+ dnbd3Crc32List = getDnbd3Crc32List();
+ if ( dnbd3Crc32List != null ) {
+ try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) {
+ fos.write( dnbd3Crc32List );
+ }
+ }
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index 6594e31..99b30ea 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -5,14 +5,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.zip.CRC32;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
public class FileChunk
{
- private static final Logger LOGGER = Logger.getLogger( FileChunk.class );
+ private static final Logger LOGGER = LogManager.getLogger( FileChunk.class );
/**
* Length in bytes of binary sha1 representation
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index f6b27f7..abbcd35 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -9,15 +9,18 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class HashChecker
{
public static final int BLOCKING = 1;
- public static final int CALC_HASH = 2;
+ public static final int CHECK_SHA1 = 2;
public static final int CALC_CRC32 = 4;
-
- private static final Logger LOGGER = Logger.getLogger( HashChecker.class );
+ public static final int CALC_SHA1 = 8;
+ public static final int NO_SLOW_WARN = 16;
+
+ private static final Logger LOGGER = LogManager.getLogger( HashChecker.class );
private final BlockingQueue<HashTask> queue;
@@ -26,7 +29,7 @@ public class HashChecker
private final String algorithm;
private boolean invalid = false;
-
+
private final int queueCapacity;
public HashChecker( String algorithm ) throws NoSuchAlgorithmException
@@ -96,11 +99,12 @@ public class HashChecker
public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException
{
boolean blocking = ( flags & BLOCKING ) != 0;
- boolean doHash = ( flags & CALC_HASH ) != 0;
- boolean doCrc32 = ( flags & CALC_CRC32 ) != 0;
- if ( doHash && chunk.getSha1Sum() == null )
+ boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0;
+ boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0;
+ boolean calcSha1 = ( flags & CALC_SHA1 ) != 0;
+ if ( checkSha1 && chunk.getSha1Sum() == null )
throw new NullPointerException( "Chunk has no sha1 hash" );
- HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 );
+ HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 );
synchronized ( threads ) {
if ( invalid ) {
execCallback( task, HashResult.FAILURE );
@@ -132,11 +136,18 @@ public class HashChecker
}
}
}
- if ( doHash ) {
+ if ( checkSha1 ) {
chunk.setStatus( ChunkStatus.HASHING );
}
if ( blocking ) {
+ long pre = System.currentTimeMillis();
queue.put( task );
+ if ( ( flags & NO_SLOW_WARN ) == 0 ) {
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 1000 ) {
+ LOGGER.warn( "HashChecker.queue() took " + duration + "ms" );
+ }
+ }
} else {
if ( !queue.offer( task ) ) {
return false;
@@ -152,7 +163,7 @@ public class HashChecker
{
return queue.size();
}
-
+
public int getQueueCapacity()
{
return queueCapacity;
@@ -202,15 +213,19 @@ public class HashChecker
break;
}
HashResult result = HashResult.NONE;
- if ( task.doHash ) {
+ if ( task.checkSha1 || task.calcSha1 ) {
// Calculate digest
- md.update( task.data, 0, task.chunk.range.getLength() );
- byte[] digest = md.digest();
- result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ md.update( task.data, 0, task.chunk.range.getLength() );
+ byte[] digest = md.digest();
+ if ( task.checkSha1 ) {
+ result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ } else {
+ task.chunk.setSha1Sum( digest );
+ }
}
- if ( task.doCrc32 ) {
- // Calculate CRC32
- task.chunk.calculateDnbd3Crc32( task.data );
+ if ( task.calcCrc32 ) {
+ // Calculate CRC32
+ task.chunk.calculateDnbd3Crc32( task.data );
}
execCallback( task, result );
}
@@ -223,7 +238,7 @@ public class HashChecker
public static enum HashResult
{
- NONE, // No hashing tool place
+ NONE, // No hashing took place
VALID, // Hash matches
INVALID, // Hash does not match
FAILURE // Error calculating hash
@@ -234,16 +249,18 @@ public class HashChecker
public final byte[] data;
public final FileChunk chunk;
public final HashCheckCallback callback;
- public final boolean doHash;
- public final boolean doCrc32;
+ public final boolean checkSha1;
+ public final boolean calcCrc32;
+ public final boolean calcSha1;
- public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 )
+ public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 )
{
this.data = data;
this.chunk = chunk;
this.callback = callback;
- this.doHash = doHash;
- this.doCrc32 = doCrc32;
+ this.checkSha1 = checkSha1;
+ this.calcCrc32 = calcCrc32;
+ this.calcSha1 = calcSha1;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 8a69020..5cca7b8 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -14,7 +14,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.filetransfer.DataReceivedCallback;
@@ -30,7 +31,7 @@ import org.openslx.util.ThriftUtil;
public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback
{
- private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class );
+ private static final Logger LOGGER = LogManager.getLogger( IncomingTransferBase.class );
/**
* Remote peer is uploading, so on our end, we have Downloaders
@@ -153,7 +154,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
@Override
public final int getActiveConnectionCount()
{
- return downloads.size();
+ synchronized ( downloads ) {
+ return downloads.size();
+ }
}
public final boolean hashesEqual( List<ByteBuffer> blockHashes )
@@ -215,11 +218,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
public void updateBlockHashList( List<byte[]> hashList )
{
if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
- LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state );
+ LOGGER.info( this.getId() + ": Rejecting block hash list in state " + state );
return;
}
if ( hashList == null ) {
- LOGGER.debug( this.getId() + ": Rejecting null block hash list" );
+ LOGGER.info( this.getId() + ": Rejecting null block hash list" );
return;
}
int firstNew = chunks.updateSha1Sums( hashList );
@@ -249,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
continue;
}
try {
- if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop
+ if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop
chunks.markCompleted( chunk, false );
break;
}
@@ -286,6 +289,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( sources != null && !sources.isEmpty() ) {
chunks.markLocalCopyCandidates( sources );
}
+ if ( state == TransferState.IDLE ) {
+ state = TransferState.WORKING;
+ }
localCopyManager.trigger();
}
@@ -389,7 +395,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
currentChunk = chunks.getMissing();
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
- cancel();
+ LOGGER.info( "Incoming transfer connection was interrupted" );
return null;
}
if ( currentChunk == null ) {
@@ -429,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
InterruptedException passEx = null;
if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 );
return true;
} catch ( InterruptedException e ) {
passEx = e;
@@ -437,7 +443,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
// We have no hash checker, or hasher rejected block,
// or the hash for the current chunk is unknown - flush to disk
+ long pre = System.currentTimeMillis();
writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 2000 ) {
+ LOGGER.warn( "Writing chunk to disk before hash check took " + duration + "ms. Storage backend overloaded?" );
+ }
chunks.markCompleted( currentChunk, false );
chunkStatusChanged( currentChunk );
if ( passEx != null )
@@ -463,6 +474,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
@Override
public void run()
{
+ int active;
try {
CbHandler cbh = new CbHandler( connection );
if ( connection.download( cbh, cbh ) ) {
@@ -481,7 +493,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
chunkStatusChanged( cbh.currentChunk );
}
- LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
+ LOGGER.info( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped prematurely" );
}
if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
lastActivityTime.set( System.currentTimeMillis() );
@@ -489,6 +501,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
} finally {
synchronized ( downloads ) {
downloads.remove( connection );
+ active = downloads.size();
}
}
if ( chunks.isComplete() ) {
@@ -499,6 +512,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( localCopyManager != null ) {
localCopyManager.trigger();
}
+ LOGGER.info( "Downloader disconnected, " + active + " still running. " + chunks.getStats() );
+ } else {
+ LOGGER.info( "Downloader disconnected, state=" + state + ". " + chunks.getStats() );
}
}
} );
@@ -563,7 +579,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk )
{
if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
- LOGGER.debug( "hashCheckDone called in bad state " + state.name() );
+ LOGGER.warn( "hashCheckDone called in bad state " + state.name() );
return;
}
switch ( result ) {
@@ -576,7 +592,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.markCompleted( chunk, true );
} else {
try {
+ long pre = System.currentTimeMillis();
writeFileData( chunk.range.startOffset, chunk.range.getLength(), data );
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 2000 ) {
+ LOGGER.warn( "Writing chunk to disk after hash check took " + duration + "ms. Storage backend overloaded?" );
+ }
chunks.markCompleted( chunk, true );
} catch ( Exception e ) {
LOGGER.warn( "Cannot write to file after hash check", e );
@@ -600,7 +621,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
// A block finished, see if we can queue a new one
queueUnhashedChunk( false );
- if ( localCopyManager != null ) {
+ if ( localCopyManager != null && localCopyManager.isAlive() ) {
localCopyManager.trigger();
}
}
@@ -617,7 +638,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
try {
data = loadChunkFromFile( chunk );
} catch ( EOFException e1 ) {
- LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." );
+ LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking as invalid." );
chunks.markFailed( chunk );
chunkStatusChanged( chunk );
return;
@@ -629,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return;
}
try {
- int flags = HashChecker.CALC_HASH;
+ int flags = HashChecker.CHECK_SHA1;
if ( blocking ) {
flags |= HashChecker.BLOCKING;
}
@@ -645,7 +666,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
final synchronized void finishUploadInternal()
{
- if ( state == TransferState.FINISHED ) {
+ if ( state == TransferState.FINISHED || state == TransferState.ERROR ) {
return;
}
try {
@@ -659,17 +680,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( localCopyManager != null ) {
localCopyManager.interrupt();
}
- if ( state != TransferState.WORKING ) {
+ state = TransferState.FINISHED; // Races...
+ if ( !finishIncomingTransfer() ) {
state = TransferState.ERROR;
- } else {
- state = TransferState.FINISHED; // Races...
- if ( !finishIncomingTransfer() ) {
- state = TransferState.ERROR;
- }
}
}
- protected HashChecker getHashChecker()
+ public static HashChecker getHashChecker()
{
return hashChecker;
}
diff --git a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
index 54dd2d0..e1fad97 100644
--- a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
+++ b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
@@ -9,7 +9,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.LocalChunkSource.SourceFile;
import org.openslx.util.Util;
@@ -17,7 +18,7 @@ import org.openslx.util.Util;
public class LocalCopyManager extends Thread
{
- private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class );
+ private static final Logger LOGGER = LogManager.getLogger( LocalCopyManager.class );
private FileChunk currentChunk = null;
diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
index 18296c5..ad2e96c 100644
--- a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
@@ -6,7 +6,8 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Uploader;
@@ -17,7 +18,7 @@ public abstract class OutgoingTransferBase extends AbstractTransfer
* Constants
*/
- private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class );
+ private static final Logger LOGGER = LogManager.getLogger( OutgoingTransferBase.class );
private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 );
@@ -74,9 +75,13 @@ public abstract class OutgoingTransferBase extends AbstractTransfer
@Override
public void run()
{
- boolean ret = connection.upload( sourceFile.getAbsolutePath() );
- synchronized ( uploads ) {
- uploads.remove( connection );
+ boolean ret = false;
+ try {
+ ret = connection.upload( sourceFile.getAbsolutePath() );
+ } finally {
+ synchronized ( uploads ) {
+ uploads.remove( connection );
+ }
}
if ( ret ) {
connectFails.set( 0 );