summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-18 15:18:05 +0200
committerSimon Rettberg2016-04-18 15:18:05 +0200
commitcc70f09431deb7937e01cc6583884fb5067a2994 (patch)
treefcf7c8720a4479b09e07c82eb13f7015bb4d0533 /src/main/java/org/openslx/filetransfer
parentPreparations/changes for global image sync (diff)
downloadmaster-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.gz
master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.xz
master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.zip
More additions for central image store
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java98
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java6
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java24
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java61
4 files changed, 174 insertions, 15 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index 1b33102..e00aa0e 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -2,12 +2,14 @@ package org.openslx.filetransfer.util;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
+import org.openslx.util.ThriftUtil;
public class ChunkList
{
@@ -33,7 +35,7 @@ public class ChunkList
// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing
private final ByteBuffer statusArray;
-
+
/**
* True if at least one block has a checksum set
*/
@@ -121,6 +123,35 @@ public class ChunkList
}
/**
+ * Set status of blocks according to given "ismissing" list. Intended to be called
+ * right after creating the list, in case we have a local file already and want to
+ * resume downloading.
+ */
+ public synchronized void resumeFromStatusList( List<Boolean> statusList, long fileLength )
+ {
+ if ( !completeChunks.isEmpty() || !pendingChunks.isEmpty() ) {
+ LOGGER.warn( "Inconsistent state: resume called when not all chunks are marked missing" );
+ }
+ int index = 0;
+ for ( Boolean missing : statusList ) {
+ FileChunk chunk = allChunks.get( index );
+ if ( fileLength != 0 && fileLength < chunk.range.endOffset )
+ break; // Stop, file is shorter than end of this chunk
+ if ( missingChunks.remove( chunk ) || pendingChunks.remove( chunk ) ) {
+ completeChunks.add( chunk );
+ }
+ if ( missing ) {
+ // Trigger hashing
+ chunk.setStatus( ChunkStatus.HASHING );
+ } else {
+ // Assume complete
+ chunk.setStatus( ChunkStatus.COMPLETE );
+ }
+ index++;
+ }
+ }
+
+ /**
* Get a chunk that is marked complete, has a sha1 hash, but has not been hash-checked yet.
*
* @return chunk
@@ -166,7 +197,7 @@ public class ChunkList
public synchronized int markFailed( FileChunk c )
{
if ( !pendingChunks.remove( c ) ) {
- LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString()
+ LOGGER.warn( "Inconsistent state: markFailed called for Chunk " + c.toString()
+ ", but chunk is not marked as currently transferring!" );
return -1;
}
@@ -178,6 +209,25 @@ public class ChunkList
}
/**
+ * Mark a missing chunk as complete.
+ */
+ private synchronized boolean markMissingAsComplete( int index )
+ {
+ FileChunk chunk = allChunks.get( index );
+ if ( completeChunks.contains( chunk ) )
+ return true;
+ if ( !missingChunks.remove( chunk ) ) {
+ LOGGER.warn( "Inconsistent state: markMissingAsComplete called for chunk " + chunk.toString() + " (indexed as " + index
+ + ") which is not missing" );
+ return false;
+ }
+ chunk.setStatus( ChunkStatus.COMPLETE );
+ completeChunks.add( chunk );
+ this.notifyAll();
+ return true;
+ }
+
+ /**
* Check if all blocks in this list are marked as successfully transfered. If a complete chunk is
* marked as "hashing", or if there are some complete chunks without a sha1sum and some with a
* sha1sum, the transfer is considered incomplete.
@@ -259,4 +309,48 @@ public class ChunkList
return allChunks;
}
+ public static boolean hashListsEqualFcBb( List<FileChunk> one, List<ByteBuffer> two )
+ {
+ return hashListsEqualFcArray( one, ThriftUtil.unwrapByteBufferList( two ) );
+ }
+
+ public static boolean hashListsEqualFcArray( List<FileChunk> one, List<byte[]> two )
+ {
+ if ( one.size() != two.size() )
+ return false;
+ FileChunk first = one.get( 0 );
+ if ( first == null || first.getSha1Sum() == null )
+ return false;
+ Iterator<byte[]> it = two.iterator();
+ for ( FileChunk existingChunk : one ) {
+ byte[] testChunk = it.next();
+ if ( !Arrays.equals( testChunk, existingChunk.getSha1Sum() ) )
+ return false;
+ }
+ return true;
+ }
+
+ public static boolean hashListsEqualBbBb( List<ByteBuffer> list1, List<ByteBuffer> list2 )
+ {
+ return hashListsEqualBbArray( list1, ThriftUtil.unwrapByteBufferList( list2 ) );
+ }
+
+ public static boolean hashListsEqualBbArray( List<ByteBuffer> bufferList, List<byte[]> arrayList )
+ {
+ return hashListsEqualArray( ThriftUtil.unwrapByteBufferList( bufferList ), arrayList );
+ }
+
+ public static boolean hashListsEqualArray( List<byte[]> list1, List<byte[]> list2 )
+ {
+ if ( list1.size() != list2.size() )
+ return false;
+ Iterator<byte[]> it1 = list1.iterator();
+ Iterator<byte[]> it2 = list2.iterator();
+ while ( it1.hasNext() && it2.hasNext() ) {
+ if ( !Arrays.equals( it1.next(), it2.next() ) )
+ return false;
+ }
+ return true;
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index 62f7d46..0aff296 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -31,7 +31,7 @@ public class FileChunk
}
}
- public synchronized void setSha1Sum( byte[] sha1sum )
+ synchronized void setSha1Sum( byte[] sha1sum )
{
if ( this.sha1sum != null || sha1sum == null || sha1sum.length != SHA1_LENGTH )
return;
@@ -47,7 +47,7 @@ public class FileChunk
*
* @return Number of times the transfer failed now
*/
- public synchronized int incFailed()
+ synchronized int incFailed()
{
return ++failCount;
}
@@ -82,7 +82,7 @@ public class FileChunk
return writtenToDisk;
}
- protected synchronized void setStatus( ChunkStatus status )
+ synchronized void setStatus( ChunkStatus status )
{
if ( status != null ) {
if ( status == ChunkStatus.COMPLETE ) {
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index 5b647aa..d9db7df 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -75,7 +75,16 @@ public class HashChecker
}
}
- public void queue( FileChunk chunk, byte[] data, HashCheckCallback callback ) throws InterruptedException
+ /**
+ * Queue the given chunk for hashing. The chunk should be in pending state.
+ *
+ * @param chunk chunk to hash
+ * @param data binary data of this chunk
+ * @param callback callback to call when hashing is done
+ * @return true if the chunk was handled, false if the queue was full and rejected the chunk.
+ * @throws InterruptedException
+ */
+ public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, boolean blocking ) throws InterruptedException
{
byte[] sha1Sum = chunk.getSha1Sum();
if ( sha1Sum == null )
@@ -84,7 +93,7 @@ public class HashChecker
synchronized ( threads ) {
if ( invalid ) {
execCallback( task, HashResult.FAILURE );
- return;
+ return true;
}
if ( queue.remainingCapacity() <= 1 && threads.size() < Runtime.getRuntime().availableProcessors() ) {
try {
@@ -95,8 +104,18 @@ public class HashChecker
LOGGER.warn( "Could not create additional hash checking thread", e );
}
}
+ }
+ ChunkStatus old = chunk.getStatus();
+ chunk.setStatus( ChunkStatus.HASHING );
+ if ( blocking ) {
queue.put( task );
+ } else {
+ if ( !queue.offer( task ) ) {
+ chunk.setStatus( old );
+ return false;
+ }
}
+ return true;
}
// ############################################################# \\
@@ -171,7 +190,6 @@ public class HashChecker
this.data = data;
this.chunk = chunk;
this.callback = callback;
- chunk.setStatus( ChunkStatus.HASHING );
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index b738ef6..9406c27 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -181,16 +182,26 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( hashChecker == null )
return;
FileChunk chunk;
- while ( null != ( chunk = chunks.getUnhashedComplete() ) ) {
- byte[] data = loadChunkFromFile( chunk );
+ int cnt = 0;
+ while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) {
+ byte[] data;
+ try {
+ data = loadChunkFromFile( chunk );
+ } catch ( EOFException e1 ) {
+ LOGGER.warn( "blockhash update: file too short, marking chunk as invalid" );
+ chunks.markFailed( chunk );
+ chunkStatusChanged( chunk );
+ continue;
+ }
if ( data == null ) {
- LOGGER.warn( "Will mark unloadable chunk as valid :-(" );
+ LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" );
chunks.markSuccessful( chunk );
chunkStatusChanged( chunk );
continue;
}
try {
- hashChecker.queue( chunk, data, this );
+ if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop
+ break;
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
return;
@@ -198,7 +209,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
- private byte[] loadChunkFromFile( FileChunk chunk )
+ private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException
{
synchronized ( tmpFileHandle ) {
if ( state != TransferState.IDLE && state != TransferState.WORKING )
@@ -208,6 +219,8 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
byte[] buffer = new byte[ chunk.range.getLength() ];
tmpFileHandle.readFully( buffer );
return buffer;
+ } catch ( EOFException e ) {
+ throw e;
} catch ( IOException e ) {
LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e );
return null;
@@ -256,7 +269,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( currentChunk != null ) {
if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this );
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, true );
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
return null;
@@ -341,7 +354,6 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" );
}
if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
- LOGGER.debug( "Download from satellite complete" );
lastActivityTime.set( System.currentTimeMillis() );
}
synchronized ( downloads ) {
@@ -349,6 +361,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
if ( chunks.isComplete() ) {
finishUploadInternal();
+ } else {
+ // Keep pumping unhashed chunks into the hasher
+ queueUnhashedChunk();
}
}
} );
@@ -436,6 +451,38 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunkStatusChanged( chunk );
break;
}
+ // A block finished, see if we can queue a new one
+ queueUnhashedChunk();
+ }
+
+ /**
+ * Gets an unhashed chunk (if existent) and queues it for hashing
+ */
+ protected void queueUnhashedChunk()
+ {
+ FileChunk chunk = chunks.getUnhashedComplete();
+ if ( chunk == null )
+ return;
+ byte[] data;
+ try {
+ data = loadChunkFromFile( chunk );
+ } catch ( EOFException e1 ) {
+ LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." );
+ chunks.markFailed( chunk );
+ chunkStatusChanged( chunk );
+ return;
+ }
+ if ( data == null ) {
+ LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" );
+ chunks.markSuccessful( chunk );
+ chunkStatusChanged( chunk );
+ return;
+ }
+ try {
+ hashChecker.queue( chunk, data, this, true );
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ }
}
private synchronized void finishUploadInternal()