summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-13 18:38:47 +0200
committerSimon Rettberg2016-04-13 18:38:47 +0200
commit34ca2905c38d17bbded01cf7497eca790e760a39 (patch)
tree4dbaff08d7f88d48e685bd514b907c8d77571f16 /src/main/java/org/openslx/filetransfer
parentremove ruleId from NetRule struct (diff)
downloadmaster-sync-shared-34ca2905c38d17bbded01cf7497eca790e760a39.tar.gz
master-sync-shared-34ca2905c38d17bbded01cf7497eca790e760a39.tar.xz
master-sync-shared-34ca2905c38d17bbded01cf7497eca790e760a39.zip
Preparations/changes for global image sync
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java10
-rw-r--r--src/main/java/org/openslx/filetransfer/util/AbstractTransfer.java133
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java6
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java3
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java475
5 files changed, 622 insertions, 5 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index f952bdc..cf9b475 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -110,7 +110,14 @@ public abstract class Transfer
return true;
}
- public boolean sendDone()
+ public void sendDoneAndClose()
+ {
+ sendDone();
+ sendEndOfMeta();
+ close( "Transfer finished" );
+ }
+
+ protected boolean sendDone()
{
try {
sendKeyValuePair( "DONE", "" );
@@ -202,7 +209,6 @@ public abstract class Transfer
sendErrorCode( error );
if ( callback != null )
callback.uploadError( error );
- log.info( error );
}
synchronized ( transferSocket ) {
safeClose( dataFromServer, outStream, transferSocket );
diff --git a/src/main/java/org/openslx/filetransfer/util/AbstractTransfer.java b/src/main/java/org/openslx/filetransfer/util/AbstractTransfer.java
new file mode 100644
index 0000000..75c68e8
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/util/AbstractTransfer.java
@@ -0,0 +1,133 @@
+package org.openslx.filetransfer.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+
+public abstract class AbstractTransfer
+{
+
+ /**
+ * How long to keep this transfer information when the transfer is
+ * (potentially) done
+ */
+ private static final long FINISH_TIMEOUT = TimeUnit.MINUTES.toMillis( 3 );
+
+ /**
+ * How long to keep this transfer information when there are no active
+ * connections and the transfer seems unfinished
+ */
+ private static final long IDLE_TIMEOUT = TimeUnit.HOURS.toMillis( 6 );
+
+ /**
+ * How long to count this transfer towards active transfers when it has
+ * no active connection.
+ */
+ private static final long HOT_IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis( 10 );
+ /**
+ * Time stamp of when (we think) the transfer finished. Clients can/might
+ * not tell us they're done, and simply taking "no active connection" as a
+ * sign the download is done might have unwanted effects if the user's
+ * connection drops for a minute. If this time stamp (plus a FINISH_TIMEOUT)
+ * passed,
+ * we consider the download done and flag it for removal.
+ * If set to zero, the transfer is not finished, or not assumed to have
+ * finished.
+ */
+ protected final AtomicLong potentialFinishTime = new AtomicLong( 0 );
+
+ /**
+ * Time of last activity on this transfer.
+ */
+ protected final AtomicLong lastActivityTime = new AtomicLong( System.currentTimeMillis() );
+
+ private final String transferId;
+
+ public AbstractTransfer( String transferId )
+ {
+ this.transferId = transferId;
+ }
+
+ /**
+ * Returns true if the transfer is considered completed.
+ *
+ * @param now pass System.currentTimeMillis()
+ * @return true if the transfer is considered completed
+ */
+ public boolean isComplete( long now )
+ {
+ long val = potentialFinishTime.get();
+ return val != 0 && val + FINISH_TIMEOUT < now;
+ }
+
+ /**
+ * Returns true if there has been no activity on this transfer for a certain
+ * amount of time.
+ *
+ * @param now pass System.currentTimeMillis()
+ * @return true if the transfer reached its idle timeout
+ */
+ public final boolean hasReachedIdleTimeout( long now )
+ {
+ return getActiveConnectionCount() == 0 && lastActivityTime.get() + IDLE_TIMEOUT < now;
+ }
+
+ public final boolean countsTowardsConnectionLimit( long now )
+ {
+ return getActiveConnectionCount() > 0 || lastActivityTime.get() + HOT_IDLE_TIMEOUT > now;
+ }
+
+ public final String getId()
+ {
+ return transferId;
+ }
+
+ public abstract TransferInformation getTransferInfo();
+
+ /**
+ * Returns true if this transfer would potentially accept new connections.
+ * This should NOT return false if there are too many concurrent
+ * connections, as this is used to signal the client whether to keep trying
+ * to connect.
+ *
+ * @return true if this transfer would potentially accept new connections
+ */
+ public abstract boolean isActive();
+
+ /**
+ * Cancel this transfer, aborting all active connections and rejecting
+ * further incoming ones.
+ */
+ public abstract void cancel();
+
+ /**
+ * Returns number of active transfer connections.
+ *
+ * @return number of active transfer connections
+ */
+ public abstract int getActiveConnectionCount();
+
+ public abstract String getRelativePath();
+
+ /**
+ * Try to close everything passed to this method. Never throw an exception
+ * if it fails, just silently continue.
+ *
+ * @param item One or more objects that are AutoCloseable
+ */
+ static void safeClose( AutoCloseable... item )
+ {
+ if ( item == null )
+ return;
+ for ( AutoCloseable c : item ) {
+ if ( c == null )
+ continue;
+ try {
+ c.close();
+ } catch ( Exception e ) {
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index d0233ac..1b33102 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -31,7 +31,7 @@ public class ChunkList
private final List<FileChunk> completeChunks = new ArrayList<>( 100 );
- // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying
+ // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing
private final ByteBuffer statusArray;
/**
@@ -50,8 +50,8 @@ public class ChunkList
/**
* Update the sha1sums of all chunks. This is meant to be used if you passed an incomplete list
- * before (with null elements), so you don't have to has hthe whole file before starting the
- * upload, but periodically update it while thie upload is running.
+ * before (with null elements), so you don't have to hash the whole file before starting the
+ * upload, but periodically update it while the upload is running.
*
* @param sha1Sums list of sums
*/
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index 4b6ee74..62f7d46 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -8,6 +8,9 @@ import org.openslx.filetransfer.FileRange;
public class FileChunk
{
+ /**
+ * Length in bytes of binary sha1 representation
+ */
public static final int SHA1_LENGTH = 20;
public static final int CHUNK_SIZE_MIB = 16;
public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * ( 1024 * 1024 );
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
new file mode 100644
index 0000000..b738ef6
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -0,0 +1,475 @@
+package org.openslx.filetransfer.util;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.log4j.Logger;
+import org.openslx.bwlp.thrift.iface.TransferState;
+import org.openslx.filetransfer.DataReceivedCallback;
+import org.openslx.filetransfer.Downloader;
+import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.WantRangeCallback;
+import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
+import org.openslx.filetransfer.util.HashChecker.HashResult;
+import org.openslx.util.ThriftUtil;
+
+public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback
+{
+
+ private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class );
+
+ /**
+ * Remote peer is uploading, so on our end, we have Downloaders
+ */
+ private List<Downloader> downloads = new ArrayList<>();
+
+ private final File tmpFileName;
+
+ private final RandomAccessFile tmpFileHandle;
+
+ private final ChunkList chunks;
+
+ private TransferState state = TransferState.IDLE;
+
+ private final long fileSize;
+
+ private static final HashChecker hashChecker;
+
+ /*
+ * Overridable constants
+ */
+
+ protected static int MAX_CONNECTIONS_PER_TRANSFER = 2;
+
+ /**
+ * Whether file is (still) writable. Used for the file transfer callbacks.
+ */
+ private boolean fileWritable = true;
+
+ static {
+ long maxMem = Runtime.getRuntime().maxMemory();
+ if ( maxMem == Long.MAX_VALUE ) {
+ maxMem = 512;
+ }
+ int hashQueueLen = (int) ( maxMem / 100 );
+ if ( hashQueueLen < 1 ) {
+ hashQueueLen = 1;
+ } else if ( hashQueueLen > 6 ) {
+ hashQueueLen = 6;
+ }
+ HashChecker hc;
+ try {
+ hc = new HashChecker( "SHA-1", hashQueueLen );
+ } catch ( NoSuchAlgorithmException e ) {
+ hc = null;
+ }
+ hashChecker = hc;
+ }
+
+ /*_*/
+
+ public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List<byte[]> blockHashes )
+ throws FileNotFoundException
+ {
+ super( transferId );
+ this.fileSize = fileSize;
+ // Prepare path
+ tmpFileName = absFilePath;
+ tmpFileName.getParentFile().mkdirs();
+ tmpFileHandle = new RandomAccessFile( absFilePath, "rw" );
+ chunks = new ChunkList( fileSize, blockHashes );
+ }
+
+ @Override
+ public boolean isActive()
+ {
+ return state == TransferState.IDLE || state == TransferState.WORKING;
+ }
+
+ @Override
+ public synchronized void cancel()
+ {
+ if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
+ state = TransferState.ERROR;
+ }
+ synchronized ( downloads ) {
+ for ( Downloader download : downloads ) {
+ download.cancel();
+ }
+ }
+ lastActivityTime.set( 0 );
+ safeClose( tmpFileHandle );
+ }
+
+ @Override
+ public final int getActiveConnectionCount()
+ {
+ return downloads.size();
+ }
+
+ public final boolean hashesEqual( List<ByteBuffer> blockHashes )
+ {
+ List<FileChunk> existing = chunks.getAll();
+ if ( existing.size() != blockHashes.size() )
+ return false;
+ List<byte[]> hashes = ThriftUtil.unwrapByteBufferList( blockHashes );
+ FileChunk first = existing.get( 0 );
+ if ( first == null || first.getSha1Sum() == null )
+ return false;
+ Iterator<byte[]> it = hashes.iterator();
+ for ( FileChunk existingChunk : existing ) {
+ byte[] testChunk = it.next();
+ if ( !Arrays.equals( testChunk, existingChunk.getSha1Sum() ) )
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * Guettas for final/private fields
+ */
+
+ public final long getFileSize()
+ {
+ return fileSize;
+ }
+
+ public final File getTmpFileName()
+ {
+ return tmpFileName;
+ }
+
+ public final TransferState getState()
+ {
+ return state;
+ }
+
+ public final ChunkList getChunks()
+ {
+ return chunks;
+ }
+
+ /**
+ * It is possible to run a download where the remote peer didn't submit
+ * the full list of block hashes yet, as it might be about to hash the file
+ * while uploading. This method should be called to update the list
+ * of block hashes. This is a cumulative call, so the list must contain
+ * all hashes starting from block 0.
+ *
+ * @param hashList (incomplete) list of block hashes
+ */
+ public void updateBlockHashList( List<byte[]> hashList )
+ {
+ if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
+ LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state );
+ return;
+ }
+ if ( hashList == null ) {
+ LOGGER.debug( this.getId() + ": Rejecting null block hash list" );
+ return;
+ }
+ chunks.updateSha1Sums( hashList );
+ if ( hashChecker == null )
+ return;
+ FileChunk chunk;
+ while ( null != ( chunk = chunks.getUnhashedComplete() ) ) {
+ byte[] data = loadChunkFromFile( chunk );
+ if ( data == null ) {
+ LOGGER.warn( "Will mark unloadable chunk as valid :-(" );
+ chunks.markSuccessful( chunk );
+ chunkStatusChanged( chunk );
+ continue;
+ }
+ try {
+ hashChecker.queue( chunk, data, this );
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ private byte[] loadChunkFromFile( FileChunk chunk )
+ {
+ synchronized ( tmpFileHandle ) {
+ if ( state != TransferState.IDLE && state != TransferState.WORKING )
+ return null;
+ try {
+ tmpFileHandle.seek( chunk.range.startOffset );
+ byte[] buffer = new byte[ chunk.range.getLength() ];
+ tmpFileHandle.readFully( buffer );
+ return buffer;
+ } catch ( IOException e ) {
+ LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e );
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Callback class for an instance of the Downloader, which supplies
+ * the Downloader with wanted file ranges, and handles incoming data.
+ */
+ private class CbHandler implements WantRangeCallback, DataReceivedCallback
+ {
+ /**
+ * The current chunk being transfered.
+ */
+ private FileChunk currentChunk = null;
+ /**
+ * Current buffer to receive to
+ */
+ private byte[] buffer = new byte[ FileChunk.CHUNK_SIZE ];
+ /**
+ * Downloader object
+ */
+ private final Downloader downloader;
+
+ private CbHandler( Downloader downloader )
+ {
+ this.downloader = downloader;
+ }
+
+ @Override
+ public boolean dataReceived( long fileOffset, int dataLength, byte[] data )
+ {
+ if ( currentChunk == null )
+ throw new IllegalStateException( "dataReceived without current chunk" );
+ if ( !currentChunk.range.contains( fileOffset, fileOffset + dataLength ) )
+ throw new IllegalStateException( "dataReceived with file data out of range" );
+ System.arraycopy( data, 0, buffer, (int) ( fileOffset - currentChunk.range.startOffset ), dataLength );
+ return fileWritable;
+ }
+
+ @Override
+ public FileRange get()
+ {
+ if ( currentChunk != null ) {
+ if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
+ try {
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this );
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ try {
+ buffer = new byte[ buffer.length ];
+ } catch ( OutOfMemoryError e ) {
+ // Usually catching OOM errors is a bad idea, but it's quite safe here as
+ // we know exactly where it happened, no hidden sub-calls through 20 objects.
+ // The most likely cause here is that the hash checker/disk cannot keep up
+ // writing out completed chunks, so we just sleep a bit and try again. If it still
+ // fails, we exit completely.
+ try {
+ Thread.sleep( 6000 );
+ } catch ( InterruptedException e1 ) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ // Might raise OOM again, but THIS TIME I MEAN IT
+ try {
+ buffer = new byte[ buffer.length ];
+ } catch ( OutOfMemoryError e2 ) {
+ downloader.sendErrorCode( "Out of RAM" );
+ cancel();
+ }
+ }
+ } else {
+ // We have no hash checker or the hash for the current chunk is unknown - flush to disk
+ writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
+ chunks.markSuccessful( currentChunk );
+ chunkStatusChanged( currentChunk );
+ }
+ }
+ // Get next missing chunk
+ try {
+ currentChunk = chunks.getMissing();
+ } catch ( InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ cancel();
+ return null;
+ }
+ if ( currentChunk == null ) {
+ return null; // No more chunks, returning null tells the Downloader we're done.
+ }
+ // Check remaining disk space and abort if it's too low
+ if ( !hasEnoughFreeSpace() ) {
+ downloader.sendErrorCode( "Out of disk space" );
+ LOGGER.error( "Out of space: Cancelling upload of " + getTmpFileName().getAbsolutePath() );
+ cancel();
+ return null;
+ }
+ return currentChunk.range;
+ }
+ }
+
+ public boolean addConnection( final Downloader connection, ExecutorService pool )
+ {
+ if ( state == TransferState.FINISHED ) {
+ handleIncomingWhenFinished( connection, pool );
+ return true;
+ }
+ if ( state == TransferState.ERROR )
+ return false;
+ synchronized ( downloads ) {
+ if ( downloads.size() >= MAX_CONNECTIONS_PER_TRANSFER )
+ return false;
+ downloads.add( connection );
+ }
+ try {
+ pool.execute( new Runnable() {
+ @Override
+ public void run()
+ {
+ CbHandler cbh = new CbHandler( connection );
+ if ( !connection.download( cbh, cbh ) ) {
+ if ( cbh.currentChunk != null ) {
+ // If the download failed and we have a current chunk, put it back into
+ // the queue, so it will be handled again later...
+ chunks.markFailed( cbh.currentChunk );
+ chunkStatusChanged( cbh.currentChunk );
+ }
+ LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" );
+ }
+ if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
+ LOGGER.debug( "Download from satellite complete" );
+ lastActivityTime.set( System.currentTimeMillis() );
+ }
+ synchronized ( downloads ) {
+ downloads.remove( connection );
+ }
+ if ( chunks.isComplete() ) {
+ finishUploadInternal();
+ }
+ }
+ } );
+ } catch ( Exception e ) {
+ LOGGER.warn( "threadpool rejected the incoming file transfer", e );
+ synchronized ( downloads ) {
+ downloads.remove( connection );
+ }
+ return false;
+ }
+ if ( state == TransferState.IDLE ) {
+ state = TransferState.WORKING;
+ }
+ return true;
+ }
+
+ private boolean handleIncomingWhenFinished( final Downloader connection, ExecutorService pool )
+ {
+ try {
+ pool.execute( new Runnable() {
+ @Override
+ public void run()
+ {
+ connection.sendDoneAndClose();
+ }
+ } );
+ } catch ( Exception e ) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Write some data to the local file. Thread safe so we can
+ * have multiple concurrent connections.
+ *
+ * @param fileOffset
+ * @param dataLength
+ * @param data
+ * @return
+ */
+ private void writeFileData( long fileOffset, int dataLength, byte[] data )
+ {
+ synchronized ( tmpFileHandle ) {
+ if ( state != TransferState.WORKING )
+ throw new IllegalStateException( "Cannot write to file if state != WORKING" );
+ try {
+ tmpFileHandle.seek( fileOffset );
+ tmpFileHandle.write( data, 0, dataLength );
+ } catch ( IOException e ) {
+ LOGGER.error( "Cannot write to '" + getTmpFileName()
+ + "'. Disk full, network storage error, bad permissions, ...?", e );
+ fileWritable = false;
+ }
+ }
+ if ( !fileWritable ) {
+ cancel();
+ }
+ }
+
+ @Override
+ public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk )
+ {
+ if ( state != TransferState.IDLE && state != TransferState.WORKING )
+ return;
+ switch ( result ) {
+ case FAILURE:
+ LOGGER.warn( "Hash check of chunk " + chunk.toString()
+ + " could not be executed. Assuming valid :-(" );
+ // Fall through
+ case VALID:
+ if ( !chunk.isWrittenToDisk() ) {
+ writeFileData( chunk.range.startOffset, chunk.range.getLength(), data );
+ }
+ chunks.markSuccessful( chunk );
+ chunkStatusChanged( chunk );
+ if ( chunks.isComplete() ) {
+ finishUploadInternal();
+ }
+ break;
+ case INVALID:
+ LOGGER.warn( "Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch "
+ + chunk.getFailCount() + "x :-(" );
+ chunks.markFailed( chunk );
+ chunkStatusChanged( chunk );
+ break;
+ }
+ }
+
+ private synchronized void finishUploadInternal()
+ {
+ if ( state == TransferState.FINISHED ) {
+ return;
+ }
+ safeClose( tmpFileHandle );
+ if ( state != TransferState.WORKING ) {
+ state = TransferState.ERROR;
+ } else {
+ state = TransferState.FINISHED; // Races...
+ if ( !finishIncomingTransfer() ) {
+ state = TransferState.ERROR;
+ }
+ }
+ }
+
+ /*
+ *
+ */
+
+ /**
+ * Override this and return true if the destination of this download has
+ * still enough free space so we don't run into disk full errors.
+ */
+ protected abstract boolean hasEnoughFreeSpace();
+
+ /**
+ * This will be called once the download is complete.
+ * The file handle used for writing has been closed before calling this.
+ */
+ protected abstract boolean finishIncomingTransfer();
+
+ protected abstract void chunkStatusChanged( FileChunk chunk );
+
+}