diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util')
6 files changed, 147 insertions, 59 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 91d6f1e..27f8e8c 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -10,14 +11,15 @@ import java.util.LinkedList; import java.util.List; import java.util.zip.CRC32; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.util.ThriftUtil; public class ChunkList { - private static final Logger LOGGER = Logger.getLogger( ChunkList.class ); + private static final Logger LOGGER = LogManager.getLogger( ChunkList.class ); /** * Here we keep a list of all chunks in the proper order, in case we quickly need to access one @@ -91,7 +93,7 @@ public class ChunkList * Get CRC32 list in DNBD3 format. All checksums are little * endian and prefixed by the crc32 sum of the list itself. */ - public synchronized byte[] getDnbd3Crc32List() throws IOException + public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException { byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master long nextChunkOffset = 0; @@ -142,7 +144,6 @@ public class ChunkList * Returns true if this list contains a chunk with state MISSING, * which means the chunk doesn't have a sha1 known to exist in * another image. - * @return */ public synchronized boolean hasLocallyMissingChunk() { @@ -204,7 +205,7 @@ public class ChunkList missingChunks.addAll( append ); } } catch ( Exception e ) { - LOGGER.warn( "chunk clone list if messed up", e ); + LOGGER.warn( "chunk clone list is messed up", e ); } } } @@ -414,6 +415,35 @@ public class ChunkList return sb.toString(); } + public synchronized String getStats() + { + int complete = 0, copying = 0, hashing = 0, missing = 0, qfc = 0, uploading = 0; + for ( FileChunk chunk : allChunks ) { + switch ( chunk.status ) { + case COMPLETE: + complete++; + break; + case COPYING: + copying++; + break; + case HASHING: + hashing++; + break; + case MISSING: + missing++; + break; + case QUEUED_FOR_COPY: + qfc++; + break; + case UPLOADING: + uploading++; + break; + } + } + return "(" + allChunks.size() + ":" + completeChunks.size() + "/" + pendingChunks.size() + "/" + missingChunks.size() + ")" + + " (" + complete + "/" + copying + "/" + hashing + "/" + missing + "/" + qfc + "/" + uploading + ")"; + } + public synchronized boolean isEmpty() { return allChunks.isEmpty(); @@ -491,4 +521,21 @@ public class ChunkList return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum ); } + /** + * Write DNBD3 CRC32 list to given file. + * + * @throws IllegalStateException + * @throws IOException + */ + public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException + { + byte[] dnbd3Crc32List = null; + dnbd3Crc32List = getDnbd3Crc32List(); + if ( dnbd3Crc32List != null ) { + try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) { + fos.write( dnbd3Crc32List ); + } + } + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index 6594e31..99b30ea 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -5,14 +5,15 @@ import java.util.Iterator; import java.util.List; import java.util.zip.CRC32; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; public class FileChunk { - private static final Logger LOGGER = Logger.getLogger( FileChunk.class ); + private static final Logger LOGGER = LogManager.getLogger( FileChunk.class ); /** * Length in bytes of binary sha1 representation diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index f6b27f7..abbcd35 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -9,15 +9,18 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class HashChecker { public static final int BLOCKING = 1; - public static final int CALC_HASH = 2; + public static final int CHECK_SHA1 = 2; public static final int CALC_CRC32 = 4; - - private static final Logger LOGGER = Logger.getLogger( HashChecker.class ); + public static final int CALC_SHA1 = 8; + public static final int NO_SLOW_WARN = 16; + + private static final Logger LOGGER = LogManager.getLogger( HashChecker.class ); private final BlockingQueue<HashTask> queue; @@ -26,7 +29,7 @@ public class HashChecker private final String algorithm; private boolean invalid = false; - + private final int queueCapacity; public HashChecker( String algorithm ) throws NoSuchAlgorithmException @@ -96,11 +99,12 @@ public class HashChecker public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException { boolean blocking = ( flags & BLOCKING ) != 0; - boolean doHash = ( flags & CALC_HASH ) != 0; - boolean doCrc32 = ( flags & CALC_CRC32 ) != 0; - if ( doHash && chunk.getSha1Sum() == null ) + boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0; + boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0; + boolean calcSha1 = ( flags & CALC_SHA1 ) != 0; + if ( checkSha1 && chunk.getSha1Sum() == null ) throw new NullPointerException( "Chunk has no sha1 hash" ); - HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 ); + HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 ); synchronized ( threads ) { if ( invalid ) { execCallback( task, HashResult.FAILURE ); @@ -132,11 +136,18 @@ public class HashChecker } } } - if ( doHash ) { + if ( checkSha1 ) { chunk.setStatus( ChunkStatus.HASHING ); } if ( blocking ) { + long pre = System.currentTimeMillis(); queue.put( task ); + if ( ( flags & NO_SLOW_WARN ) == 0 ) { + long duration = System.currentTimeMillis() - pre; + if ( duration > 1000 ) { + LOGGER.warn( "HashChecker.queue() took " + duration + "ms" ); + } + } } else { if ( !queue.offer( task ) ) { return false; @@ -152,7 +163,7 @@ public class HashChecker { return queue.size(); } - + public int getQueueCapacity() { return queueCapacity; @@ -202,15 +213,19 @@ public class HashChecker break; } HashResult result = HashResult.NONE; - if ( task.doHash ) { + if ( task.checkSha1 || task.calcSha1 ) { // Calculate digest - md.update( task.data, 0, task.chunk.range.getLength() ); - byte[] digest = md.digest(); - result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + md.update( task.data, 0, task.chunk.range.getLength() ); + byte[] digest = md.digest(); + if ( task.checkSha1 ) { + result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + } else { + task.chunk.setSha1Sum( digest ); + } } - if ( task.doCrc32 ) { - // Calculate CRC32 - task.chunk.calculateDnbd3Crc32( task.data ); + if ( task.calcCrc32 ) { + // Calculate CRC32 + task.chunk.calculateDnbd3Crc32( task.data ); } execCallback( task, result ); } @@ -223,7 +238,7 @@ public class HashChecker public static enum HashResult { - NONE, // No hashing tool place + NONE, // No hashing took place VALID, // Hash matches INVALID, // Hash does not match FAILURE // Error calculating hash @@ -234,16 +249,18 @@ public class HashChecker public final byte[] data; public final FileChunk chunk; public final HashCheckCallback callback; - public final boolean doHash; - public final boolean doCrc32; + public final boolean checkSha1; + public final boolean calcCrc32; + public final boolean calcSha1; - public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 ) + public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 ) { this.data = data; this.chunk = chunk; this.callback = callback; - this.doHash = doHash; - this.doCrc32 = doCrc32; + this.checkSha1 = checkSha1; + this.calcCrc32 = calcCrc32; + this.calcSha1 = calcSha1; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 8a69020..5cca7b8 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -14,7 +14,8 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.filetransfer.DataReceivedCallback; @@ -30,7 +31,7 @@ import org.openslx.util.ThriftUtil; public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback { - private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class ); + private static final Logger LOGGER = LogManager.getLogger( IncomingTransferBase.class ); /** * Remote peer is uploading, so on our end, we have Downloaders @@ -153,7 +154,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H @Override public final int getActiveConnectionCount() { - return downloads.size(); + synchronized ( downloads ) { + return downloads.size(); + } } public final boolean hashesEqual( List<ByteBuffer> blockHashes ) @@ -215,11 +218,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H public void updateBlockHashList( List<byte[]> hashList ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { - LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state ); + LOGGER.info( this.getId() + ": Rejecting block hash list in state " + state ); return; } if ( hashList == null ) { - LOGGER.debug( this.getId() + ": Rejecting null block hash list" ); + LOGGER.info( this.getId() + ": Rejecting null block hash list" ); return; } int firstNew = chunks.updateSha1Sums( hashList ); @@ -249,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H continue; } try { - if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop + if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop chunks.markCompleted( chunk, false ); break; } @@ -286,6 +289,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( sources != null && !sources.isEmpty() ) { chunks.markLocalCopyCandidates( sources ); } + if ( state == TransferState.IDLE ) { + state = TransferState.WORKING; + } localCopyManager.trigger(); } @@ -389,7 +395,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H currentChunk = chunks.getMissing(); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); - cancel(); + LOGGER.info( "Incoming transfer connection was interrupted" ); return null; } if ( currentChunk == null ) { @@ -429,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H InterruptedException passEx = null; if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 ); return true; } catch ( InterruptedException e ) { passEx = e; @@ -437,7 +443,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } // We have no hash checker, or hasher rejected block, // or the hash for the current chunk is unknown - flush to disk + long pre = System.currentTimeMillis(); writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); + long duration = System.currentTimeMillis() - pre; + if ( duration > 2000 ) { + LOGGER.warn( "Writing chunk to disk before hash check took " + duration + "ms. Storage backend overloaded?" ); + } chunks.markCompleted( currentChunk, false ); chunkStatusChanged( currentChunk ); if ( passEx != null ) @@ -463,6 +474,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H @Override public void run() { + int active; try { CbHandler cbh = new CbHandler( connection ); if ( connection.download( cbh, cbh ) ) { @@ -481,7 +493,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } chunkStatusChanged( cbh.currentChunk ); } - LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); + LOGGER.info( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped prematurely" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); @@ -489,6 +501,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } finally { synchronized ( downloads ) { downloads.remove( connection ); + active = downloads.size(); } } if ( chunks.isComplete() ) { @@ -499,6 +512,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( localCopyManager != null ) { localCopyManager.trigger(); } + LOGGER.info( "Downloader disconnected, " + active + " still running. " + chunks.getStats() ); + } else { + LOGGER.info( "Downloader disconnected, state=" + state + ". " + chunks.getStats() ); } } } ); @@ -563,7 +579,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { - LOGGER.debug( "hashCheckDone called in bad state " + state.name() ); + LOGGER.warn( "hashCheckDone called in bad state " + state.name() ); return; } switch ( result ) { @@ -576,7 +592,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markCompleted( chunk, true ); } else { try { + long pre = System.currentTimeMillis(); writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); + long duration = System.currentTimeMillis() - pre; + if ( duration > 2000 ) { + LOGGER.warn( "Writing chunk to disk after hash check took " + duration + "ms. Storage backend overloaded?" ); + } chunks.markCompleted( chunk, true ); } catch ( Exception e ) { LOGGER.warn( "Cannot write to file after hash check", e ); @@ -600,7 +621,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } // A block finished, see if we can queue a new one queueUnhashedChunk( false ); - if ( localCopyManager != null ) { + if ( localCopyManager != null && localCopyManager.isAlive() ) { localCopyManager.trigger(); } } @@ -617,7 +638,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { - LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." ); + LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking as invalid." ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); return; @@ -629,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return; } try { - int flags = HashChecker.CALC_HASH; + int flags = HashChecker.CHECK_SHA1; if ( blocking ) { flags |= HashChecker.BLOCKING; } @@ -645,7 +666,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H final synchronized void finishUploadInternal() { - if ( state == TransferState.FINISHED ) { + if ( state == TransferState.FINISHED || state == TransferState.ERROR ) { return; } try { @@ -659,17 +680,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( localCopyManager != null ) { localCopyManager.interrupt(); } - if ( state != TransferState.WORKING ) { + state = TransferState.FINISHED; // Races... + if ( !finishIncomingTransfer() ) { state = TransferState.ERROR; - } else { - state = TransferState.FINISHED; // Races... - if ( !finishIncomingTransfer() ) { - state = TransferState.ERROR; - } } } - protected HashChecker getHashChecker() + public static HashChecker getHashChecker() { return hashChecker; } diff --git a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java index 54dd2d0..e1fad97 100644 --- a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java +++ b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java @@ -9,7 +9,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.filetransfer.LocalChunkSource.SourceFile; import org.openslx.util.Util; @@ -17,7 +18,7 @@ import org.openslx.util.Util; public class LocalCopyManager extends Thread { - private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class ); + private static final Logger LOGGER = LogManager.getLogger( LocalCopyManager.class ); private FileChunk currentChunk = null; diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java index 18296c5..ad2e96c 100644 --- a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java @@ -6,7 +6,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Uploader; @@ -17,7 +18,7 @@ public abstract class OutgoingTransferBase extends AbstractTransfer * Constants */ - private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class ); + private static final Logger LOGGER = LogManager.getLogger( OutgoingTransferBase.class ); private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 ); @@ -74,9 +75,13 @@ public abstract class OutgoingTransferBase extends AbstractTransfer @Override public void run() { - boolean ret = connection.upload( sourceFile.getAbsolutePath() ); - synchronized ( uploads ) { - uploads.remove( connection ); + boolean ret = false; + try { + ret = connection.upload( sourceFile.getAbsolutePath() ); + } finally { + synchronized ( uploads ) { + uploads.remove( connection ); + } } if ( ret ) { connectFails.set( 0 ); |