summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2018-04-13 10:10:49 +0200
committerSimon Rettberg2018-04-13 10:10:49 +0200
commite8873c0718b39666a87086d1e0a8a35c51003a76 (patch)
tree151b65dddd54361ce442517f51aeef3e2fb4ca1a
parentadded mountpoint/drive and displaynames to netshare (diff)
downloadmaster-sync-shared-e8873c0718b39666a87086d1e0a8a35c51003a76.tar.gz
master-sync-shared-e8873c0718b39666a87086d1e0a8a35c51003a76.tar.xz
master-sync-shared-e8873c0718b39666a87086d1e0a8a35c51003a76.zip
Filetransfer: Support calculating dnbd3 crc32 list
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java32
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java32
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java42
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java23
-rw-r--r--src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java16
5 files changed, 132 insertions, 13 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index c692499..cd1bc69 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -7,6 +8,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.zip.CRC32;
import org.apache.log4j.Logger;
import org.openslx.util.ThriftUtil;
@@ -74,6 +76,36 @@ public class ChunkList
}
/**
+ * Get CRC32 list in DNBD3 format. All checksums are little
+ * endian and prefixed by the crc32 sum of the list itself.
+ */
+ public synchronized byte[] getDnbd3Crc32List() throws IOException
+ {
+ byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master
+ long nextChunkOffset = 0;
+ int nextCrcArrayPos = 4;
+ for ( FileChunk c : allChunks ) {
+ if ( c.crc32 == null ) {
+ throw new IllegalStateException( "Called on ChunkList that doesn't have crc32 enabled" );
+ }
+ if ( c.range.startOffset != nextChunkOffset ) {
+ throw new IllegalStateException( "Chunk list is not in order or has wrong chunk size" );
+ }
+ nextChunkOffset += FileChunk.CHUNK_SIZE;
+ c.getCrc32Le( buffer, nextCrcArrayPos );
+ nextCrcArrayPos += 4;
+ }
+ CRC32 masterCrc = new CRC32();
+ masterCrc.update( buffer, 4, buffer.length - 4 );
+ int value = (int)masterCrc.getValue();
+ buffer[3] = (byte) ( value >>> 24 );
+ buffer[2] = (byte) ( value >>> 16 );
+ buffer[1] = (byte) ( value >>> 8 );
+ buffer[0] = (byte)value;
+ return buffer;
+ }
+
+ /**
* Get a missing chunk, marking it pending.
*
* @return chunk marked as missing
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index e00b011..6450af2 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -2,6 +2,7 @@ package org.openslx.filetransfer.util;
import java.util.Iterator;
import java.util.List;
+import java.util.zip.CRC32;
import org.openslx.filetransfer.FileRange;
@@ -18,6 +19,7 @@ public class FileChunk
public final FileRange range;
private int failCount = 0;
protected byte[] sha1sum;
+ protected CRC32 crc32;
protected ChunkStatus status = ChunkStatus.MISSING;
private boolean writtenToDisk = false;
@@ -73,6 +75,36 @@ public class FileChunk
return status;
}
+ public synchronized void calculateDnbd3Crc32( byte[] data )
+ {
+ // As this is usually called before we validated the sha1, handle the case where
+ // this gets called multiple times and only remember the last result
+ if ( crc32 == null ) {
+ crc32 = new CRC32();
+ } else {
+ crc32.reset();
+ }
+ int chunkLength = range.getLength();
+ crc32.update( data, 0, chunkLength );
+ if ( ( chunkLength % 4096 ) != 0 ) {
+ // DNBD3 virtually pads all images to be a multiple of 4KiB in size,
+ // so simulate that here too
+ byte[] padding = new byte[ 4096 - ( chunkLength % 4096 ) ];
+ crc32.update( padding );
+ }
+ }
+
+ public synchronized void getCrc32Le( byte[] buffer, int offset )
+ {
+ if ( crc32 == null )
+ throw new IllegalStateException( "Trying to get CRC32 on Chunk that doesn't have one" );
+ int value = (int)crc32.getValue();
+ buffer[offset + 3] = (byte) ( value >>> 24 );
+ buffer[offset + 2] = (byte) ( value >>> 16 );
+ buffer[offset + 1] = (byte) ( value >>> 8 );
+ buffer[offset + 0] = (byte)value;
+ }
+
/**
* Whether the chunk of data this chunk refers to has been written to
* disk and is assumed to be valid/up to date.
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index b9b62b1..273bc7e 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -12,6 +12,10 @@ import org.apache.log4j.Logger;
public class HashChecker
{
+ public static final int BLOCKING = 1;
+ public static final int CALC_HASH = 2;
+ public static final int CALC_CRC32 = 4;
+
private static final Logger LOGGER = Logger.getLogger( HashChecker.class );
private final BlockingQueue<HashTask> queue;
@@ -69,6 +73,8 @@ public class HashChecker
private void execCallback( HashTask task, HashResult result )
{
+ if ( task.callback == null )
+ return;
try {
task.callback.hashCheckDone( result, task.data, task.chunk );
} catch ( Throwable t ) {
@@ -85,12 +91,14 @@ public class HashChecker
* @return true if the chunk was handled, false if the queue was full and rejected the chunk.
* @throws InterruptedException
*/
- public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, boolean blocking ) throws InterruptedException
+ public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException
{
- byte[] sha1Sum = chunk.getSha1Sum();
- if ( sha1Sum == null )
+ boolean blocking = ( flags & BLOCKING ) != 0;
+ boolean doHash = ( flags & CALC_HASH ) != 0;
+ boolean doCrc32 = ( flags & CALC_CRC32 ) != 0;
+ if ( doHash && chunk.getSha1Sum() == null )
throw new NullPointerException( "Chunk has no sha1 hash" );
- HashTask task = new HashTask( data, chunk, callback );
+ HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 );
synchronized ( threads ) {
if ( invalid ) {
execCallback( task, HashResult.FAILURE );
@@ -106,7 +114,9 @@ public class HashChecker
}
}
}
- chunk.setStatus( ChunkStatus.HASHING );
+ if ( doHash ) {
+ chunk.setStatus( ChunkStatus.HASHING );
+ }
if ( blocking ) {
queue.put( task );
} else {
@@ -153,10 +163,17 @@ public class HashChecker
LOGGER.info( "Interrupted while waiting for hash task", e );
break;
}
- // Calculate digest
- md.update( task.data, 0, task.chunk.range.getLength() );
- byte[] digest = md.digest();
- HashResult result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ HashResult result = HashResult.NONE;
+ if ( task.doHash ) {
+ // Calculate digest
+ md.update( task.data, 0, task.chunk.range.getLength() );
+ byte[] digest = md.digest();
+ result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ }
+ if ( task.doCrc32 ) {
+ // Calculate CRC32
+ task.chunk.calculateDnbd3Crc32( task.data );
+ }
execCallback( task, result );
if ( extraThread && queue.isEmpty() ) {
break;
@@ -173,6 +190,7 @@ public class HashChecker
public static enum HashResult
{
+ NONE, // No hashing tool place
VALID, // Hash matches
INVALID, // Hash does not match
FAILURE // Error calculating hash
@@ -183,12 +201,16 @@ public class HashChecker
public final byte[] data;
public final FileChunk chunk;
public final HashCheckCallback callback;
+ public final boolean doHash;
+ public final boolean doCrc32;
- public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback )
+ public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 )
{
this.data = data;
this.chunk = chunk;
this.callback = callback;
+ this.doHash = doHash;
+ this.doCrc32 = doCrc32;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 4135ca7..b298c04 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -214,7 +214,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
continue;
}
try {
- if ( !hashChecker.queue( chunk, data, this, false ) ) { // false == queue full, stop
+ if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop
chunks.markCompleted( chunk, false );
break;
}
@@ -285,9 +285,10 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
public FileRange get()
{
if ( currentChunk != null ) {
+ chunkReceived( currentChunk, buffer );
if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, true );
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
} catch ( InterruptedException e ) {
chunks.markCompleted( currentChunk, false );
currentChunk = null;
@@ -515,7 +516,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return;
}
try {
- if ( !hashChecker.queue( chunk, data, this, blocking ) ) {
+ int flags = HashChecker.CALC_HASH;
+ if ( blocking ) {
+ flags |= HashChecker.BLOCKING;
+ }
+ if ( !hashChecker.queue( chunk, data, this, flags ) ) {
chunks.markCompleted( chunk, false );
}
} catch ( InterruptedException e ) {
@@ -540,6 +545,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
}
+
+ protected HashChecker getHashChecker()
+ {
+ return hashChecker;
+ }
/*
*
@@ -559,4 +569,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
protected abstract void chunkStatusChanged( FileChunk chunk );
+ /**
+ * Called when a chunk has been received -- no validation has taken place yet
+ */
+ protected void chunkReceived( FileChunk chunk, byte[] data )
+ {
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java b/src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java
new file mode 100644
index 0000000..cc47a8e
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/util/StandaloneFileChunk.java
@@ -0,0 +1,16 @@
+package org.openslx.filetransfer.util;
+
+public class StandaloneFileChunk extends FileChunk
+{
+
+ public StandaloneFileChunk( long startOffset, long endOffset, byte[] sha1sum )
+ {
+ super( startOffset, endOffset, sha1sum );
+ }
+
+ public void overrideStatus(ChunkStatus status)
+ {
+ this.status = status;
+ }
+
+}