summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-18 15:18:05 +0200
committerSimon Rettberg2016-04-18 15:18:05 +0200
commitcc70f09431deb7937e01cc6583884fb5067a2994 (patch)
treefcf7c8720a4479b09e07c82eb13f7015bb4d0533 /src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
parentPreparations/changes for global image sync (diff)
downloadmaster-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.gz
master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.tar.xz
master-sync-shared-cc70f09431deb7937e01cc6583884fb5067a2994.zip
More additions for central image store
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java61
1 files changed, 54 insertions, 7 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index b738ef6..9406c27 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -181,16 +182,26 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( hashChecker == null )
return;
FileChunk chunk;
- while ( null != ( chunk = chunks.getUnhashedComplete() ) ) {
- byte[] data = loadChunkFromFile( chunk );
+ int cnt = 0;
+ while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) {
+ byte[] data;
+ try {
+ data = loadChunkFromFile( chunk );
+ } catch ( EOFException e1 ) {
+ LOGGER.warn( "blockhash update: file too short, marking chunk as invalid" );
+ chunks.markFailed( chunk );
+ chunkStatusChanged( chunk );
+ continue;
+ }
if ( data == null ) {
- LOGGER.warn( "Will mark unloadable chunk as valid :-(" );
+ LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" );
chunks.markSuccessful( chunk );
chunkStatusChanged( chunk );
continue;
}
try {
- hashChecker.queue( chunk, data, this );
+ if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop
+ break;
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
return;
@@ -198,7 +209,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
- private byte[] loadChunkFromFile( FileChunk chunk )
+ private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException
{
synchronized ( tmpFileHandle ) {
if ( state != TransferState.IDLE && state != TransferState.WORKING )
@@ -208,6 +219,8 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
byte[] buffer = new byte[ chunk.range.getLength() ];
tmpFileHandle.readFully( buffer );
return buffer;
+ } catch ( EOFException e ) {
+ throw e;
} catch ( IOException e ) {
LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e );
return null;
@@ -256,7 +269,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( currentChunk != null ) {
if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this );
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, true );
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
return null;
@@ -341,7 +354,6 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" );
}
if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
- LOGGER.debug( "Download from satellite complete" );
lastActivityTime.set( System.currentTimeMillis() );
}
synchronized ( downloads ) {
@@ -349,6 +361,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
if ( chunks.isComplete() ) {
finishUploadInternal();
+ } else {
+ // Keep pumping unhashed chunks into the hasher
+ queueUnhashedChunk();
}
}
} );
@@ -436,6 +451,38 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunkStatusChanged( chunk );
break;
}
+ // A block finished, see if we can queue a new one
+ queueUnhashedChunk();
+ }
+
+ /**
+ * Gets an unhashed chunk (if existent) and queues it for hashing
+ */
+ protected void queueUnhashedChunk()
+ {
+ FileChunk chunk = chunks.getUnhashedComplete();
+ if ( chunk == null )
+ return;
+ byte[] data;
+ try {
+ data = loadChunkFromFile( chunk );
+ } catch ( EOFException e1 ) {
+ LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." );
+ chunks.markFailed( chunk );
+ chunkStatusChanged( chunk );
+ return;
+ }
+ if ( data == null ) {
+ LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" );
+ chunks.markSuccessful( chunk );
+ chunkStatusChanged( chunk );
+ return;
+ }
+ try {
+ hashChecker.queue( chunk, data, this, true );
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ }
}
private synchronized void finishUploadInternal()