diff options
author | Simon Rettberg | 2016-04-18 15:18:05 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-18 15:18:05 +0200 |
commit | cc70f09431deb7937e01cc6583884fb5067a2994 (patch) | |
tree | fcf7c8720a4479b09e07c82eb13f7015bb4d0533 /src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java | |
parent | Preparations/changes for global image sync (diff) | |
download | master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.gz master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.xz master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.zip |
More additions for central image store
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java | 61 |
1 files changed, 54 insertions, 7 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index b738ef6..9406c27 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -181,16 +182,26 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( hashChecker == null ) return; FileChunk chunk; - while ( null != ( chunk = chunks.getUnhashedComplete() ) ) { - byte[] data = loadChunkFromFile( chunk ); + int cnt = 0; + while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) { + byte[] data; + try { + data = loadChunkFromFile( chunk ); + } catch ( EOFException e1 ) { + LOGGER.warn( "blockhash update: file too short, marking chunk as invalid" ); + chunks.markFailed( chunk ); + chunkStatusChanged( chunk ); + continue; + } if ( data == null ) { - LOGGER.warn( "Will mark unloadable chunk as valid :-(" ); + LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" ); chunks.markSuccessful( chunk ); chunkStatusChanged( chunk ); continue; } try { - hashChecker.queue( chunk, data, this ); + if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop + break; } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); return; @@ -198,7 +209,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } - private byte[] loadChunkFromFile( FileChunk chunk ) + private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException { synchronized ( tmpFileHandle ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) @@ -208,6 +219,8 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H byte[] buffer = new byte[ chunk.range.getLength() ]; tmpFileHandle.readFully( buffer ); return buffer; + } catch ( EOFException e ) { + throw e; } catch ( IOException e ) { LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e ); return null; @@ -256,7 +269,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( currentChunk != null ) { if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this ); + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, true ); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); return null; @@ -341,7 +354,6 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { - LOGGER.debug( "Download from satellite complete" ); lastActivityTime.set( System.currentTimeMillis() ); } synchronized ( downloads ) { @@ -349,6 +361,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( chunks.isComplete() ) { finishUploadInternal(); + } else { + // Keep pumping unhashed chunks into the hasher + queueUnhashedChunk(); } } } ); @@ -436,6 +451,38 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunkStatusChanged( chunk ); break; } + // A block finished, see if we can queue a new one + queueUnhashedChunk(); + } + + /** + * Gets an unhashed chunk (if existent) and queues it for hashing + */ + protected void queueUnhashedChunk() + { + FileChunk chunk = chunks.getUnhashedComplete(); + if ( chunk == null ) + return; + byte[] data; + try { + data = loadChunkFromFile( chunk ); + } catch ( EOFException e1 ) { + LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." ); + chunks.markFailed( chunk ); + chunkStatusChanged( chunk ); + return; + } + if ( data == null ) { + LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" ); + chunks.markSuccessful( chunk ); + chunkStatusChanged( chunk ); + return; + } + try { + hashChecker.queue( chunk, data, this, true ); + } catch ( InterruptedException e ) { + Thread.currentThread().interrupt(); + } } private synchronized void finishUploadInternal() |