diff options
author | Simon Rettberg | 2016-04-18 15:18:05 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-18 15:18:05 +0200 |
commit | cc70f09431deb7937e01cc6583884fb5067a2994 (patch) | |
tree | fcf7c8720a4479b09e07c82eb13f7015bb4d0533 /src/main/java/org/openslx/filetransfer | |
parent | Preparations/changes for global image sync (diff) | |
download | master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.gz master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.xz master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.zip |
More additions for central image store
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
4 files changed, 174 insertions, 15 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 1b33102..e00aa0e 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -2,12 +2,14 @@ package org.openslx.filetransfer.util; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import org.apache.log4j.Logger; +import org.openslx.util.ThriftUtil; public class ChunkList { @@ -33,7 +35,7 @@ public class ChunkList // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing private final ByteBuffer statusArray; - + /** * True if at least one block has a checksum set */ @@ -121,6 +123,35 @@ public class ChunkList } /** + * Set status of blocks according to given "ismissing" list. Intended to be called + * right after creating the list, in case we have a local file already and want to + * resume downloading. + */ + public synchronized void resumeFromStatusList( List<Boolean> statusList, long fileLength ) + { + if ( !completeChunks.isEmpty() || !pendingChunks.isEmpty() ) { + LOGGER.warn( "Inconsistent state: resume called when not all chunks are marked missing" ); + } + int index = 0; + for ( Boolean missing : statusList ) { + FileChunk chunk = allChunks.get( index ); + if ( fileLength != 0 && fileLength < chunk.range.endOffset ) + break; // Stop, file is shorter than end of this chunk + if ( missingChunks.remove( chunk ) || pendingChunks.remove( chunk ) ) { + completeChunks.add( chunk ); + } + if ( missing ) { + // Trigger hashing + chunk.setStatus( ChunkStatus.HASHING ); + } else { + // Assume complete + chunk.setStatus( ChunkStatus.COMPLETE ); + } + index++; + } + } + + /** * Get a chunk that is marked complete, has a sha1 hash, but has not been hash-checked yet. * * @return chunk @@ -166,7 +197,7 @@ public class ChunkList public synchronized int markFailed( FileChunk c ) { if ( !pendingChunks.remove( c ) ) { - LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString() + LOGGER.warn( "Inconsistent state: markFailed called for Chunk " + c.toString() + ", but chunk is not marked as currently transferring!" ); return -1; } @@ -178,6 +209,25 @@ public class ChunkList } /** + * Mark a missing chunk as complete. + */ + private synchronized boolean markMissingAsComplete( int index ) + { + FileChunk chunk = allChunks.get( index ); + if ( completeChunks.contains( chunk ) ) + return true; + if ( !missingChunks.remove( chunk ) ) { + LOGGER.warn( "Inconsistent state: markMissingAsComplete called for chunk " + chunk.toString() + " (indexed as " + index + + ") which is not missing" ); + return false; + } + chunk.setStatus( ChunkStatus.COMPLETE ); + completeChunks.add( chunk ); + this.notifyAll(); + return true; + } + + /** * Check if all blocks in this list are marked as successfully transfered. If a complete chunk is * marked as "hashing", or if there are some complete chunks without a sha1sum and some with a * sha1sum, the transfer is considered incomplete. @@ -259,4 +309,48 @@ public class ChunkList return allChunks; } + public static boolean hashListsEqualFcBb( List<FileChunk> one, List<ByteBuffer> two ) + { + return hashListsEqualFcArray( one, ThriftUtil.unwrapByteBufferList( two ) ); + } + + public static boolean hashListsEqualFcArray( List<FileChunk> one, List<byte[]> two ) + { + if ( one.size() != two.size() ) + return false; + FileChunk first = one.get( 0 ); + if ( first == null || first.getSha1Sum() == null ) + return false; + Iterator<byte[]> it = two.iterator(); + for ( FileChunk existingChunk : one ) { + byte[] testChunk = it.next(); + if ( !Arrays.equals( testChunk, existingChunk.getSha1Sum() ) ) + return false; + } + return true; + } + + public static boolean hashListsEqualBbBb( List<ByteBuffer> list1, List<ByteBuffer> list2 ) + { + return hashListsEqualBbArray( list1, ThriftUtil.unwrapByteBufferList( list2 ) ); + } + + public static boolean hashListsEqualBbArray( List<ByteBuffer> bufferList, List<byte[]> arrayList ) + { + return hashListsEqualArray( ThriftUtil.unwrapByteBufferList( bufferList ), arrayList ); + } + + public static boolean hashListsEqualArray( List<byte[]> list1, List<byte[]> list2 ) + { + if ( list1.size() != list2.size() ) + return false; + Iterator<byte[]> it1 = list1.iterator(); + Iterator<byte[]> it2 = list2.iterator(); + while ( it1.hasNext() && it2.hasNext() ) { + if ( !Arrays.equals( it1.next(), it2.next() ) ) + return false; + } + return true; + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index 62f7d46..0aff296 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -31,7 +31,7 @@ public class FileChunk } } - public synchronized void setSha1Sum( byte[] sha1sum ) + synchronized void setSha1Sum( byte[] sha1sum ) { if ( this.sha1sum != null || sha1sum == null || sha1sum.length != SHA1_LENGTH ) return; @@ -47,7 +47,7 @@ public class FileChunk * * @return Number of times the transfer failed now */ - public synchronized int incFailed() + synchronized int incFailed() { return ++failCount; } @@ -82,7 +82,7 @@ public class FileChunk return writtenToDisk; } - protected synchronized void setStatus( ChunkStatus status ) + synchronized void setStatus( ChunkStatus status ) { if ( status != null ) { if ( status == ChunkStatus.COMPLETE ) { diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index 5b647aa..d9db7df 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -75,7 +75,16 @@ public class HashChecker } } - public void queue( FileChunk chunk, byte[] data, HashCheckCallback callback ) throws InterruptedException + /** + * Queue the given chunk for hashing. The chunk should be in pending state. + * + * @param chunk chunk to hash + * @param data binary data of this chunk + * @param callback callback to call when hashing is done + * @return true if the chunk was handled, false if the queue was full and rejected the chunk. + * @throws InterruptedException + */ + public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, boolean blocking ) throws InterruptedException { byte[] sha1Sum = chunk.getSha1Sum(); if ( sha1Sum == null ) @@ -84,7 +93,7 @@ public class HashChecker synchronized ( threads ) { if ( invalid ) { execCallback( task, HashResult.FAILURE ); - return; + return true; } if ( queue.remainingCapacity() <= 1 && threads.size() < Runtime.getRuntime().availableProcessors() ) { try { @@ -95,8 +104,18 @@ public class HashChecker LOGGER.warn( "Could not create additional hash checking thread", e ); } } + } + ChunkStatus old = chunk.getStatus(); + chunk.setStatus( ChunkStatus.HASHING ); + if ( blocking ) { queue.put( task ); + } else { + if ( !queue.offer( task ) ) { + chunk.setStatus( old ); + return false; + } } + return true; } // ############################################################# \\ @@ -171,7 +190,6 @@ public class HashChecker this.data = data; this.chunk = chunk; this.callback = callback; - chunk.setStatus( ChunkStatus.HASHING ); } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index b738ef6..9406c27 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -181,16 +182,26 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( hashChecker == null ) return; FileChunk chunk; - while ( null != ( chunk = chunks.getUnhashedComplete() ) ) { - byte[] data = loadChunkFromFile( chunk ); + int cnt = 0; + while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) { + byte[] data; + try { + data = loadChunkFromFile( chunk ); + } catch ( EOFException e1 ) { + LOGGER.warn( "blockhash update: file too short, marking chunk as invalid" ); + chunks.markFailed( chunk ); + chunkStatusChanged( chunk ); + continue; + } if ( data == null ) { - LOGGER.warn( "Will mark unloadable chunk as valid :-(" ); + LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" ); chunks.markSuccessful( chunk ); chunkStatusChanged( chunk ); continue; } try { - hashChecker.queue( chunk, data, this ); + if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop + break; } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); return; @@ -198,7 +209,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } - private byte[] loadChunkFromFile( FileChunk chunk ) + private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException { synchronized ( tmpFileHandle ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) @@ -208,6 +219,8 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H byte[] buffer = new byte[ chunk.range.getLength() ]; tmpFileHandle.readFully( buffer ); return buffer; + } catch ( EOFException e ) { + throw e; } catch ( IOException e ) { LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e ); return null; @@ -256,7 +269,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( currentChunk != null ) { if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this ); + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, true ); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); return null; @@ -341,7 +354,6 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { - LOGGER.debug( "Download from satellite complete" ); lastActivityTime.set( System.currentTimeMillis() ); } synchronized ( downloads ) { @@ -349,6 +361,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( chunks.isComplete() ) { finishUploadInternal(); + } else { + // Keep pumping unhashed chunks into the hasher + queueUnhashedChunk(); } } } ); @@ -436,6 +451,38 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunkStatusChanged( chunk ); break; } + // A block finished, see if we can queue a new one + queueUnhashedChunk(); + } + + /** + * Gets an unhashed chunk (if existent) and queues it for hashing + */ + protected void queueUnhashedChunk() + { + FileChunk chunk = chunks.getUnhashedComplete(); + if ( chunk == null ) + return; + byte[] data; + try { + data = loadChunkFromFile( chunk ); + } catch ( EOFException e1 ) { + LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." ); + chunks.markFailed( chunk ); + chunkStatusChanged( chunk ); + return; + } + if ( data == null ) { + LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" ); + chunks.markSuccessful( chunk ); + chunkStatusChanged( chunk ); + return; + } + try { + hashChecker.queue( chunk, data, this, true ); + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } } private synchronized void finishUploadInternal() |