diff options
author | Simon Rettberg | 2016-04-20 17:10:14 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-20 17:10:14 +0200 |
commit | ecd3d22510aa2f1aa0c44cee015bd690d19f45ce (patch) | |
tree | 8ec91bf9500a9575308898f0f70b5a90f0ba4737 /src/main/java/org/openslx/filetransfer/util | |
parent | Add queryUploadStatus to master server (diff) | |
download | master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.gz master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.xz master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.zip |
More imgsync stuff
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/util')
4 files changed, 144 insertions, 19 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 372b082..c497be0 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -174,14 +174,14 @@ public class ChunkList * * @param c The chunk in question */ - public synchronized void markSuccessful( FileChunk c ) + public synchronized void markCompleted( FileChunk c, boolean hashCheckSuccessful ) { if ( !pendingChunks.remove( c ) ) { LOGGER.warn( "Inconsistent state: markSuccessful called for Chunk " + c.toString() + ", but chunk is not marked as currently transferring!" ); return; } - c.setStatus( ChunkStatus.COMPLETE ); + c.setStatus( ( hashCheckSuccessful || c.getSha1Sum() == null ) ? ChunkStatus.COMPLETE : ChunkStatus.HASHING ); completeChunks.add( c ); this.notifyAll(); } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index d9db7df..5fdf582 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -105,13 +105,11 @@ public class HashChecker } } } - ChunkStatus old = chunk.getStatus(); chunk.setStatus( ChunkStatus.HASHING ); if ( blocking ) { queue.put( task ); } else { if ( !queue.offer( task ) ) { - chunk.setStatus( old ); return false; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 04ddc17..c2d8ee9 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -155,8 +155,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return state; } - public synchronized TransferStatus getStatus() { - return new TransferStatus(chunks.getStatusArray(), getState()); + public synchronized TransferStatus getStatus() + { + return new TransferStatus( chunks.getStatusArray(), getState() ); } public final ChunkList getChunks() @@ -186,9 +187,10 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.updateSha1Sums( hashList ); if ( hashChecker == null ) return; - FileChunk chunk; - int cnt = 0; - while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) { + for ( int cnt = 0; cnt < 3; ++cnt ) { + FileChunk chunk = chunks.getUnhashedComplete(); + if ( chunk == null ) + break; byte[] data; try { data = loadChunkFromFile( chunk ); @@ -200,14 +202,18 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( data == null ) { LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" ); - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); continue; } try { - if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop + if ( !hashChecker.queue( chunk, data, this, false ) ) { // false == queue full, stop + chunks.markCompleted( chunk, false ); break; + } } catch ( InterruptedException e ) { + LOGGER.debug( "updateBlockHashList got interrupted" ); + chunks.markCompleted( chunk, false ); Thread.currentThread().interrupt(); return; } @@ -304,7 +310,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } else { // We have no hash checker or the hash for the current chunk is unknown - flush to disk writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); - chunks.markSuccessful( currentChunk ); + chunks.markCompleted( currentChunk, true ); chunkStatusChanged( currentChunk ); } } @@ -356,7 +362,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markFailed( cbh.currentChunk ); chunkStatusChanged( cbh.currentChunk ); } - LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" ); + LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); @@ -368,7 +374,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H finishUploadInternal(); } else { // Keep pumping unhashed chunks into the hasher - queueUnhashedChunk(); + queueUnhashedChunk( true ); } } } ); @@ -443,7 +449,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( !chunk.isWrittenToDisk() ) { writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); } - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); if ( chunks.isComplete() ) { finishUploadInternal(); @@ -457,13 +463,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H break; } // A block finished, see if we can queue a new one - queueUnhashedChunk(); + queueUnhashedChunk( false ); } /** * Gets an unhashed chunk (if existent) and queues it for hashing */ - protected void queueUnhashedChunk() + protected void queueUnhashedChunk( boolean blocking ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) @@ -479,12 +485,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( data == null ) { LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" ); - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); return; } try { - hashChecker.queue( chunk, data, this, true ); + hashChecker.queue( chunk, data, this, blocking ); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java new file mode 100644 index 0000000..12deddb --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java @@ -0,0 +1,121 @@ +package org.openslx.filetransfer.util; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.filetransfer.Uploader; + +public abstract class OutgoingTransferBase extends AbstractTransfer +{ + + /* + * Constants + */ + + private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class ); + + private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 ); + + /* + * Overridable constants + */ + + protected static int MAX_CONNECTIONS_PER_TRANSFER = 2; + + /* + * Class members + */ + + /** + * Remote peer is downloading, so we have Uploaders + */ + private final List<Uploader> uploads = new ArrayList<>(); + + /** + * File being uploaded + */ + private final File sourceFile; + + private final TransferInformation transferInformation; + + public OutgoingTransferBase( String transferId, File sourceFile, int plainPort, int sslPort ) + { + super( transferId ); + this.sourceFile = sourceFile; + this.transferInformation = new TransferInformation( transferId, plainPort, sslPort ); + } + + /** + * Add another connection for this file transfer. + * + * @param connection + * @return true if the connection is accepted, false if it should be + * discarded + */ + public synchronized boolean addConnection( final Uploader connection, ExecutorService pool ) + { + synchronized ( uploads ) { + if ( uploads.size() >= MAX_CONNECTIONS_PER_TRANSFER ) + return false; + uploads.add( connection ); + } + return runConnectionInternal( connection, pool ); + } + + protected boolean runConnectionInternal( final Uploader connection, ExecutorService pool ) + { + try { + pool.execute( new Runnable() { + @Override + public void run() + { + boolean ret = connection.upload( sourceFile.getAbsolutePath() ); + synchronized ( uploads ) { + uploads.remove( connection ); + } + if ( ret && uploads.isEmpty() && potentialFinishTime.get() == 0 ) { + potentialFinishTime.set( System.currentTimeMillis() ); + } + lastActivityTime.set( System.currentTimeMillis() ); + } + } ); + } catch ( Exception e ) { + LOGGER.warn( "threadpool rejected the incoming file transfer", e ); + synchronized ( uploads ) { + uploads.remove( connection ); + } + return false; + } + return true; + } + + @Override + public TransferInformation getTransferInfo() + { + return transferInformation; + } + + @Override + public final boolean isActive() + { + return uploads.size() > 0 || lastActivityTime.get() + INACTIVITY_TIMEOUT > System.currentTimeMillis(); + } + + @Override + public void cancel() + { + // Void + } + + @Override + public final int getActiveConnectionCount() + { + return uploads.size(); + } + +} |