summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-20 17:10:14 +0200
committerSimon Rettberg2016-04-20 17:10:14 +0200
commitecd3d22510aa2f1aa0c44cee015bd690d19f45ce (patch)
tree8ec91bf9500a9575308898f0f70b5a90f0ba4737 /src/main/java/org/openslx/filetransfer/util
parentAdd queryUploadStatus to master server (diff)
downloadmaster-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.gz
master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.xz
master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.zip
More imgsync stuff
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java4
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java2
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java36
-rw-r--r--src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java121
4 files changed, 144 insertions, 19 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index 372b082..c497be0 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -174,14 +174,14 @@ public class ChunkList
*
* @param c The chunk in question
*/
- public synchronized void markSuccessful( FileChunk c )
+ public synchronized void markCompleted( FileChunk c, boolean hashCheckSuccessful )
{
if ( !pendingChunks.remove( c ) ) {
LOGGER.warn( "Inconsistent state: markSuccessful called for Chunk " + c.toString()
+ ", but chunk is not marked as currently transferring!" );
return;
}
- c.setStatus( ChunkStatus.COMPLETE );
+ c.setStatus( ( hashCheckSuccessful || c.getSha1Sum() == null ) ? ChunkStatus.COMPLETE : ChunkStatus.HASHING );
completeChunks.add( c );
this.notifyAll();
}
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index d9db7df..5fdf582 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -105,13 +105,11 @@ public class HashChecker
}
}
}
- ChunkStatus old = chunk.getStatus();
chunk.setStatus( ChunkStatus.HASHING );
if ( blocking ) {
queue.put( task );
} else {
if ( !queue.offer( task ) ) {
- chunk.setStatus( old );
return false;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 04ddc17..c2d8ee9 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -155,8 +155,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return state;
}
- public synchronized TransferStatus getStatus() {
- return new TransferStatus(chunks.getStatusArray(), getState());
+ public synchronized TransferStatus getStatus()
+ {
+ return new TransferStatus( chunks.getStatusArray(), getState() );
}
public final ChunkList getChunks()
@@ -186,9 +187,10 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.updateSha1Sums( hashList );
if ( hashChecker == null )
return;
- FileChunk chunk;
- int cnt = 0;
- while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) {
+ for ( int cnt = 0; cnt < 3; ++cnt ) {
+ FileChunk chunk = chunks.getUnhashedComplete();
+ if ( chunk == null )
+ break;
byte[] data;
try {
data = loadChunkFromFile( chunk );
@@ -200,14 +202,18 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
if ( data == null ) {
LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" );
- chunks.markSuccessful( chunk );
+ chunks.markCompleted( chunk, true );
chunkStatusChanged( chunk );
continue;
}
try {
- if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop
+ if ( !hashChecker.queue( chunk, data, this, false ) ) { // false == queue full, stop
+ chunks.markCompleted( chunk, false );
break;
+ }
} catch ( InterruptedException e ) {
+ LOGGER.debug( "updateBlockHashList got interrupted" );
+ chunks.markCompleted( chunk, false );
Thread.currentThread().interrupt();
return;
}
@@ -304,7 +310,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
} else {
// We have no hash checker or the hash for the current chunk is unknown - flush to disk
writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
- chunks.markSuccessful( currentChunk );
+ chunks.markCompleted( currentChunk, true );
chunkStatusChanged( currentChunk );
}
}
@@ -356,7 +362,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.markFailed( cbh.currentChunk );
chunkStatusChanged( cbh.currentChunk );
}
- LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" );
+ LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
}
if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
lastActivityTime.set( System.currentTimeMillis() );
@@ -368,7 +374,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
finishUploadInternal();
} else {
// Keep pumping unhashed chunks into the hasher
- queueUnhashedChunk();
+ queueUnhashedChunk( true );
}
}
} );
@@ -443,7 +449,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( !chunk.isWrittenToDisk() ) {
writeFileData( chunk.range.startOffset, chunk.range.getLength(), data );
}
- chunks.markSuccessful( chunk );
+ chunks.markCompleted( chunk, true );
chunkStatusChanged( chunk );
if ( chunks.isComplete() ) {
finishUploadInternal();
@@ -457,13 +463,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
break;
}
// A block finished, see if we can queue a new one
- queueUnhashedChunk();
+ queueUnhashedChunk( false );
}
/**
* Gets an unhashed chunk (if existent) and queues it for hashing
*/
- protected void queueUnhashedChunk()
+ protected void queueUnhashedChunk( boolean blocking )
{
FileChunk chunk = chunks.getUnhashedComplete();
if ( chunk == null )
@@ -479,12 +485,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
if ( data == null ) {
LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" );
- chunks.markSuccessful( chunk );
+ chunks.markCompleted( chunk, true );
chunkStatusChanged( chunk );
return;
}
try {
- hashChecker.queue( chunk, data, this, true );
+ hashChecker.queue( chunk, data, this, blocking );
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
}
diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
new file mode 100644
index 0000000..12deddb
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
@@ -0,0 +1,121 @@
+package org.openslx.filetransfer.util;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+import org.openslx.filetransfer.Uploader;
+
+public abstract class OutgoingTransferBase extends AbstractTransfer
+{
+
+ /*
+ * Constants
+ */
+
+ private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class );
+
+ private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 );
+
+ /*
+ * Overridable constants
+ */
+
+ protected static int MAX_CONNECTIONS_PER_TRANSFER = 2;
+
+ /*
+ * Class members
+ */
+
+ /**
+ * Remote peer is downloading, so we have Uploaders
+ */
+ private final List<Uploader> uploads = new ArrayList<>();
+
+ /**
+ * File being uploaded
+ */
+ private final File sourceFile;
+
+ private final TransferInformation transferInformation;
+
+ public OutgoingTransferBase( String transferId, File sourceFile, int plainPort, int sslPort )
+ {
+ super( transferId );
+ this.sourceFile = sourceFile;
+ this.transferInformation = new TransferInformation( transferId, plainPort, sslPort );
+ }
+
+ /**
+ * Add another connection for this file transfer.
+ *
+ * @param connection
+ * @return true if the connection is accepted, false if it should be
+ * discarded
+ */
+ public synchronized boolean addConnection( final Uploader connection, ExecutorService pool )
+ {
+ synchronized ( uploads ) {
+ if ( uploads.size() >= MAX_CONNECTIONS_PER_TRANSFER )
+ return false;
+ uploads.add( connection );
+ }
+ return runConnectionInternal( connection, pool );
+ }
+
+ protected boolean runConnectionInternal( final Uploader connection, ExecutorService pool )
+ {
+ try {
+ pool.execute( new Runnable() {
+ @Override
+ public void run()
+ {
+ boolean ret = connection.upload( sourceFile.getAbsolutePath() );
+ synchronized ( uploads ) {
+ uploads.remove( connection );
+ }
+ if ( ret && uploads.isEmpty() && potentialFinishTime.get() == 0 ) {
+ potentialFinishTime.set( System.currentTimeMillis() );
+ }
+ lastActivityTime.set( System.currentTimeMillis() );
+ }
+ } );
+ } catch ( Exception e ) {
+ LOGGER.warn( "threadpool rejected the incoming file transfer", e );
+ synchronized ( uploads ) {
+ uploads.remove( connection );
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public TransferInformation getTransferInfo()
+ {
+ return transferInformation;
+ }
+
+ @Override
+ public final boolean isActive()
+ {
+ return uploads.size() > 0 || lastActivityTime.get() + INACTIVITY_TIMEOUT > System.currentTimeMillis();
+ }
+
+ @Override
+ public void cancel()
+ {
+ // Void
+ }
+
+ @Override
+ public final int getActiveConnectionCount()
+ {
+ return uploads.size();
+ }
+
+}