From e8873c0718b39666a87086d1e0a8a35c51003a76 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 13 Apr 2018 10:10:49 +0200 Subject: Filetransfer: Support calculating dnbd3 crc32 list --- .../org/openslx/filetransfer/util/ChunkList.java | 32 +++++++++++++++++ .../org/openslx/filetransfer/util/FileChunk.java | 32 +++++++++++++++++ .../org/openslx/filetransfer/util/HashChecker.java | 42 ++++++++++++++++------ .../filetransfer/util/IncomingTransferBase.java | 23 ++++++++++-- .../filetransfer/util/StandaloneFileChunk.java | 16 +++++++++ 5 files changed, 132 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index c692499..cd1bc69 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -7,6 +8,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.zip.CRC32; import org.apache.log4j.Logger; import org.openslx.util.ThriftUtil; @@ -73,6 +75,36 @@ public class ChunkList } } + /** + * Get CRC32 list in DNBD3 format. All checksums are little + * endian and prefixed by the crc32 sum of the list itself. + */ + public synchronized byte[] getDnbd3Crc32List() throws IOException + { + byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master + long nextChunkOffset = 0; + int nextCrcArrayPos = 4; + for ( FileChunk c : allChunks ) { + if ( c.crc32 == null ) { + throw new IllegalStateException( "Called on ChunkList that doesn't have crc32 enabled" ); + } + if ( c.range.startOffset != nextChunkOffset ) { + throw new IllegalStateException( "Chunk list is not in order or has wrong chunk size" ); + } + nextChunkOffset += FileChunk.CHUNK_SIZE; + c.getCrc32Le( buffer, nextCrcArrayPos ); + nextCrcArrayPos += 4; + } + CRC32 masterCrc = new CRC32(); + masterCrc.update( buffer, 4, buffer.length - 4 ); + int value = (int)masterCrc.getValue(); + buffer[3] = (byte) ( value >>> 24 ); + buffer[2] = (byte) ( value >>> 16 ); + buffer[1] = (byte) ( value >>> 8 ); + buffer[0] = (byte)value; + return buffer; + } + /** * Get a missing chunk, marking it pending. * diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index e00b011..6450af2 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -2,6 +2,7 @@ package org.openslx.filetransfer.util; import java.util.Iterator; import java.util.List; +import java.util.zip.CRC32; import org.openslx.filetransfer.FileRange; @@ -18,6 +19,7 @@ public class FileChunk public final FileRange range; private int failCount = 0; protected byte[] sha1sum; + protected CRC32 crc32; protected ChunkStatus status = ChunkStatus.MISSING; private boolean writtenToDisk = false; @@ -73,6 +75,36 @@ public class FileChunk return status; } + public synchronized void calculateDnbd3Crc32( byte[] data ) + { + // As this is usually called before we validated the sha1, handle the case where + // this gets called multiple times and only remember the last result + if ( crc32 == null ) { + crc32 = new CRC32(); + } else { + crc32.reset(); + } + int chunkLength = range.getLength(); + crc32.update( data, 0, chunkLength ); + if ( ( chunkLength % 4096 ) != 0 ) { + // DNBD3 virtually pads all images to be a multiple of 4KiB in size, + // so simulate that here too + byte[] padding = new byte[ 4096 - ( chunkLength % 4096 ) ]; + crc32.update( padding ); + } + } + + public synchronized void getCrc32Le( byte[] buffer, int offset ) + { + if ( crc32 == null ) + throw new IllegalStateException( "Trying to get CRC32 on Chunk that doesn't have one" ); + int value = (int)crc32.getValue(); + buffer[offset + 3] = (byte) ( value >>> 24 ); + buffer[offset + 2] = (byte) ( value >>> 16 ); + buffer[offset + 1] = (byte) ( value >>> 8 ); + buffer[offset + 0] = (byte)value; + } + /** * Whether the chunk of data this chunk refers to has been written to * disk and is assumed to be valid/up to date. diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index b9b62b1..273bc7e 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -12,6 +12,10 @@ import org.apache.log4j.Logger; public class HashChecker { + public static final int BLOCKING = 1; + public static final int CALC_HASH = 2; + public static final int CALC_CRC32 = 4; + private static final Logger LOGGER = Logger.getLogger( HashChecker.class ); private final BlockingQueue queue; @@ -69,6 +73,8 @@ public class HashChecker private void execCallback( HashTask task, HashResult result ) { + if ( task.callback == null ) + return; try { task.callback.hashCheckDone( result, task.data, task.chunk ); } catch ( Throwable t ) { @@ -85,12 +91,14 @@ public class HashChecker * @return true if the chunk was handled, false if the queue was full and rejected the chunk. * @throws InterruptedException */ - public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, boolean blocking ) throws InterruptedException + public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException { - byte[] sha1Sum = chunk.getSha1Sum(); - if ( sha1Sum == null ) + boolean blocking = ( flags & BLOCKING ) != 0; + boolean doHash = ( flags & CALC_HASH ) != 0; + boolean doCrc32 = ( flags & CALC_CRC32 ) != 0; + if ( doHash && chunk.getSha1Sum() == null ) throw new NullPointerException( "Chunk has no sha1 hash" ); - HashTask task = new HashTask( data, chunk, callback ); + HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 ); synchronized ( threads ) { if ( invalid ) { execCallback( task, HashResult.FAILURE ); @@ -106,7 +114,9 @@ public class HashChecker } } } - chunk.setStatus( ChunkStatus.HASHING ); + if ( doHash ) { + chunk.setStatus( ChunkStatus.HASHING ); + } if ( blocking ) { queue.put( task ); } else { @@ -153,10 +163,17 @@ public class HashChecker LOGGER.info( "Interrupted while waiting for hash task", e ); break; } - // Calculate digest - md.update( task.data, 0, task.chunk.range.getLength() ); - byte[] digest = md.digest(); - HashResult result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + HashResult result = HashResult.NONE; + if ( task.doHash ) { + // Calculate digest + md.update( task.data, 0, task.chunk.range.getLength() ); + byte[] digest = md.digest(); + result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + } + if ( task.doCrc32 ) { + // Calculate CRC32 + task.chunk.calculateDnbd3Crc32( task.data ); + } execCallback( task, result ); if ( extraThread && queue.isEmpty() ) { break; @@ -173,6 +190,7 @@ public class HashChecker public static enum HashResult { + NONE, // No hashing tool place VALID, // Hash matches INVALID, // Hash does not match FAILURE // Error calculating hash @@ -183,12 +201,16 @@ public class HashChecker public final byte[] data; public final FileChunk chunk; public final HashCheckCallback callback; + public final boolean doHash; + public final boolean doCrc32; - public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback ) + public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 ) { this.data = data; this.chunk = chunk; this.callback = callback; + this.doHash = doHash; + this.doCrc32 = doCrc32; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 4135ca7..b298c04 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -214,7 +214,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H continue; } try { - if ( !hashChecker.queue( chunk, data, this, false ) ) { // false == queue full, stop + if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop chunks.markCompleted( chunk, false ); break; } @@ -285,9 +285,10 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H public FileRange get() { if ( currentChunk != null ) { + chunkReceived( currentChunk, buffer ); if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, true ); + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); } catch ( InterruptedException e ) { chunks.markCompleted( currentChunk, false ); currentChunk = null; @@ -515,7 +516,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return; } try { - if ( !hashChecker.queue( chunk, data, this, blocking ) ) { + int flags = HashChecker.CALC_HASH; + if ( blocking ) { + flags |= HashChecker.BLOCKING; + } + if ( !hashChecker.queue( chunk, data, this, flags ) ) { chunks.markCompleted( chunk, false ); } } catch ( InterruptedException e ) { @@ -540,6 +545,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } } + + protected HashChecker getHashChecker() + { + return hashChecker; + } /* * @@ -559,4 +569,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H protected abstract void chunkStatusChanged( FileChunk chunk ); + /** + * Called when a chunk has been received -- no validation has taken place yet + */ + protected void chunkReceived( FileChunk chunk, byte[] data ) + { + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java b/src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java new file mode 100644 index 0000000..cc47a8e --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java @@ -0,0 +1,16 @@ +package org.openslx.filetransfer.util; + +public class StandaloneFileChunk extends FileChunk +{ + + public StandaloneFileChunk( long startOffset, long endOffset, byte[] sha1sum ) + { + super( startOffset, endOffset, sha1sum ); + } + + public void overrideStatus(ChunkStatus status) + { + this.status = status; + } + +} -- cgit v1.2.3-55-g7522