package org.openslx.filetransfer.util; import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.LocalChunkSource; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; import org.openslx.filetransfer.util.HashChecker.HashResult; import org.openslx.util.ThriftUtil; public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback { private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class ); /** * Remote peer is uploading, so on our end, we have Downloaders */ private List downloads = new ArrayList<>(); private final File tmpFileName; private final RandomAccessFile tmpFileHandle; private final ChunkList chunks; private TransferState state = TransferState.IDLE; private final long fileSize; private static final HashChecker hashChecker; /* * Overridable constants */ protected static int MAX_CONNECTIONS_PER_TRANSFER = 2; /** * Whether file is (still) writable. Used for the file transfer callbacks. */ private boolean fileWritable = true; /** * Called for getting local sources for certain chunks by checksum */ private final LocalChunkSource localChunkSource; /** * Non-null if local copying is requested */ private final LocalCopyManager localCopyManager; static { long maxMem = Runtime.getRuntime().maxMemory(); if ( maxMem == Long.MAX_VALUE ) { LOGGER.warn( "Cannot determine maximum JVM memory -- assuming 1GB -- this might not be safe" ); maxMem = 1024; } else { maxMem /= ( 1024 * 1024 ); } final int maxLen = Math.max( 6, Runtime.getRuntime().availableProcessors() ); int hashQueueLen = (int) ( maxMem / 150 ); if ( hashQueueLen < 1 ) { hashQueueLen = 1; } else if ( hashQueueLen > maxLen ) { hashQueueLen = maxLen; } LOGGER.debug( "Queue length: " + hashQueueLen ); HashChecker hc; try { hc = new HashChecker( "SHA-1", hashQueueLen ); } catch ( NoSuchAlgorithmException e ) { hc = null; } hashChecker = hc; } /*_*/ public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List blockHashes, LocalChunkSource localChunkSource ) throws FileNotFoundException { super( transferId ); this.fileSize = fileSize; this.localChunkSource = localChunkSource; // Prepare path tmpFileName = absFilePath; tmpFileName.getParentFile().mkdirs(); tmpFileHandle = new RandomAccessFile( absFilePath, "rw" ); try { if ( tmpFileHandle.length() > fileSize ) { tmpFileHandle.setLength( fileSize ); } } catch ( IOException e ) { LOGGER.debug( "File " + tmpFileName + " is too long and could not be truncated" ); } chunks = new ChunkList( fileSize, blockHashes ); if ( this.localChunkSource != null ) { this.localCopyManager = new LocalCopyManager( this, this.chunks ); this.localCopyManager.start(); checkLocalCopyCandidates( blockHashes, 0 ); } else { this.localCopyManager = null; } } @Override public boolean isActive() { return state == TransferState.IDLE || state == TransferState.WORKING; } @Override public synchronized void cancel() { if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { state = TransferState.ERROR; } synchronized ( downloads ) { for ( Downloader download : downloads ) { download.cancel(); } } potentialFinishTime.set( 0 ); if ( localCopyManager != null ) { localCopyManager.interrupt(); } safeClose( tmpFileHandle ); } @Override public final int getActiveConnectionCount() { return downloads.size(); } public final boolean hashesEqual( List blockHashes ) { List existing = chunks.getAll(); if ( existing.size() != blockHashes.size() ) return false; List hashes = ThriftUtil.unwrapByteBufferList( blockHashes ); FileChunk first = existing.get( 0 ); if ( first == null || first.getSha1Sum() == null ) return false; Iterator it = hashes.iterator(); for ( FileChunk existingChunk : existing ) { byte[] testChunk = it.next(); if ( !Arrays.equals( testChunk, existingChunk.getSha1Sum() ) ) return false; } return true; } /* * Guettas for final/private fields */ public final long getFileSize() { return fileSize; } public final File getTmpFileName() { return tmpFileName; } public final TransferState getState() { return state; } public synchronized TransferStatus getStatus() { return new TransferStatus( chunks.getStatusArray(), getState() ); } public final ChunkList getChunks() { return chunks; } /** * It is possible to run a download where the remote peer didn't submit * the full list of block hashes yet, as it might be about to hash the file * while uploading. This method should be called to update the list * of block hashes. This is a cumulative call, so the list must contain * all hashes starting from block 0. * * @param hashList (incomplete) list of block hashes */ public void updateBlockHashList( List hashList ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state ); return; } if ( hashList == null ) { LOGGER.debug( this.getId() + ": Rejecting null block hash list" ); return; } int firstNew = chunks.updateSha1Sums( hashList ); // No hash checker? Neither hashing nor server side dedup will make sense if ( hashChecker == null ) return; // Check hashes of completed blocks for ( int cnt = 0; cnt < 3; ++cnt ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) break; byte[] data = null; try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { LOGGER.warn( "blockhash update: file too short, marking chunk as invalid" ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); continue; } catch ( Exception e ) { LOGGER.warn( "unexpected fail while loading chunk from disk", e ); } if ( data == null ) { LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" ); chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); continue; } try { if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop chunks.markCompleted( chunk, false ); break; } } catch ( InterruptedException e ) { LOGGER.debug( "updateBlockHashList got interrupted" ); chunks.markCompleted( chunk, false ); Thread.currentThread().interrupt(); return; } } // See if we have any candidates for local copy checkLocalCopyCandidates( hashList, firstNew ); } private void checkLocalCopyCandidates( List hashList, int firstNew ) { if ( localChunkSource == null || hashList == null || hashList.isEmpty() ) return; List sums; if ( firstNew <= 0 ) { sums = hashList; } else { sums = hashList.subList( firstNew, hashList.size() ); } if ( sums == null ) return; sums = Collections.unmodifiableList( sums ); List sources = null; try { sources = localChunkSource.getCloneSources( sums ); } catch ( Exception e ) { LOGGER.warn( "Could not get chunk sources", e ); } if ( sources != null && !sources.isEmpty() ) { chunks.markLocalCopyCandidates( sources ); } localCopyManager.trigger(); } private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException { synchronized ( tmpFileHandle ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) return null; try { tmpFileHandle.seek( chunk.range.startOffset ); byte[] buffer = new byte[ chunk.range.getLength() ]; tmpFileHandle.readFully( buffer ); return buffer; } catch ( EOFException e ) { throw e; } catch ( IOException e ) { LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e ); return null; } } } /** * Callback class for an instance of the Downloader, which supplies * the Downloader with wanted file ranges, and handles incoming data. */ private class CbHandler implements WantRangeCallback, DataReceivedCallback { /** * The current chunk being transfered. */ private FileChunk currentChunk = null; /** * Current buffer to receive to */ private byte[] buffer = new byte[ FileChunk.CHUNK_SIZE ]; /** * Downloader object */ private final Downloader downloader; private CbHandler( Downloader downloader ) { this.downloader = downloader; } @Override public boolean dataReceived( long fileOffset, int dataLength, byte[] data ) { if ( currentChunk == null ) throw new IllegalStateException( "dataReceived without current chunk" ); if ( !currentChunk.range.contains( fileOffset, fileOffset + dataLength ) ) throw new IllegalStateException( "dataReceived with file data out of range" ); System.arraycopy( data, 0, buffer, (int) ( fileOffset - currentChunk.range.startOffset ), dataLength ); return fileWritable; } @Override public FileRange get() { boolean needNewBuffer = false; if ( currentChunk != null ) { try { if ( chunkReceivedInternal( currentChunk, buffer ) ) { needNewBuffer = true; } } catch ( InterruptedException e3 ) { LOGGER.info( "Downloader was interrupted when trying to hash" ); currentChunk = null; return null; } if ( needNewBuffer ) { try { buffer = new byte[ buffer.length ]; } catch ( OutOfMemoryError e ) { // Usually catching OOM errors is a bad idea, but it's quite safe here as // we know exactly where it happened, no hidden sub-calls through 20 objects. // The most likely cause here is that the hash checker/disk cannot keep up // writing out completed chunks, so we just sleep a bit and try again. If it still // fails, we exit completely. try { Thread.sleep( 6000 ); } catch ( InterruptedException e1 ) { Thread.currentThread().interrupt(); return null; } // Might raise OOM again, but THIS TIME I MEAN IT try { buffer = new byte[ buffer.length ]; } catch ( OutOfMemoryError e2 ) { LOGGER.warn( "Out of JVM memory - aborting incoming " + IncomingTransferBase.this.getId() ); downloader.sendErrorCode( "Out of RAM" ); cancel(); } } } currentChunk = null; } // Get next missing chunk try { currentChunk = chunks.getMissing(); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); cancel(); return null; } if ( currentChunk == null ) { return null; // No more chunks, returning null tells the Downloader we're done. } // Check remaining disk space and abort if it's too low if ( !hasEnoughFreeSpace() ) { downloader.sendErrorCode( "Out of disk space" ); LOGGER.error( "Out of space: Cancelling upload of " + getTmpFileName().getAbsolutePath() ); cancel(); return null; } if ( state == TransferState.IDLE ) { state = TransferState.WORKING; } return currentChunk.range; } } /** * * @param currentChunk * @param buffer * @return true if buffer is used internally and should not be modified in the future, false if * reuse is safe * @throws InterruptedException */ final boolean chunkReceivedInternal( FileChunk currentChunk, byte[] buffer ) throws InterruptedException { boolean needNewBuffer = false; try { needNewBuffer = chunkReceived( currentChunk, buffer ); } catch (Exception e) { LOGGER.warn( "Callback chunkReceived caused exception", e ); needNewBuffer = true; // To be on the safe side } InterruptedException passEx = null; if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); return true; } catch ( InterruptedException e ) { passEx = e; } } // We have no hash checker, or hasher rejected block, // or the hash for the current chunk is unknown - flush to disk writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); chunks.markCompleted( currentChunk, false ); chunkStatusChanged( currentChunk ); if ( passEx != null ) throw passEx; return needNewBuffer; } public boolean addConnection( final Downloader connection, ExecutorService pool ) { if ( state == TransferState.FINISHED ) { handleIncomingWhenFinished( connection, pool ); return true; } if ( state == TransferState.ERROR ) return false; synchronized ( downloads ) { if ( downloads.size() >= MAX_CONNECTIONS_PER_TRANSFER ) return false; downloads.add( connection ); } try { pool.execute( new Runnable() { @Override public void run() { CbHandler cbh = new CbHandler( connection ); if ( connection.download( cbh, cbh ) ) { connectFails.set( 0 ); } else { connectFails.incrementAndGet(); if ( cbh.currentChunk != null ) { // If the download failed and we have a current chunk, put it back into // the queue, so it will be handled again later... chunks.markFailed( cbh.currentChunk ); // Possibly queue for local copy if ( localCopyManager != null && cbh.currentChunk.sha1sum != null ) { List lst = new ArrayList<>( 1 ); lst.add( cbh.currentChunk.sha1sum ); checkLocalCopyCandidates( lst, 0 ); } chunkStatusChanged( cbh.currentChunk ); } LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); } synchronized ( downloads ) { downloads.remove( connection ); } if ( chunks.isComplete() ) { finishUploadInternal(); } else { // Keep pumping unhashed chunks into the hasher queueUnhashedChunk( true ); if ( localCopyManager != null ) { localCopyManager.trigger(); } } } } ); } catch ( Exception e ) { LOGGER.warn( "threadpool rejected the incoming file transfer", e ); synchronized ( downloads ) { downloads.remove( connection ); } return false; } if ( state == TransferState.IDLE ) { state = TransferState.WORKING; } return true; } private boolean handleIncomingWhenFinished( final Downloader connection, ExecutorService pool ) { try { pool.execute( new Runnable() { @Override public void run() { connection.sendDoneAndClose(); } } ); } catch ( Exception e ) { return false; } return true; } /** * Write some data to the local file. Thread safe so we can * have multiple concurrent connections. * * @param fileOffset * @param dataLength * @param data * @return */ private void writeFileData( long fileOffset, int dataLength, byte[] data ) { synchronized ( tmpFileHandle ) { if ( state != TransferState.WORKING ) throw new IllegalStateException( "Cannot write to file if state != WORKING (is " + state.toString() + ")" ); try { tmpFileHandle.seek( fileOffset ); tmpFileHandle.write( data, 0, dataLength ); } catch ( IOException e ) { LOGGER.error( "Cannot write to '" + getTmpFileName() + "'. Disk full, network storage error, bad permissions, ...?", e ); fileWritable = false; } } if ( !fileWritable ) { cancel(); } } @Override public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { LOGGER.debug( "hashCheckDone called in bad state " + state.name() ); return; } switch ( result ) { case FAILURE: LOGGER.warn( "Hash check of chunk " + chunk.toString() + " could not be executed. Assuming valid :-(" ); // Fall through case VALID: if ( chunk.isWrittenToDisk() ) { chunks.markCompleted( chunk, true ); } else { try { writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); chunks.markCompleted( chunk, true ); } catch ( Exception e ) { LOGGER.warn( "Cannot write to file after hash check", e ); chunks.markFailed( chunk ); } } chunkStatusChanged( chunk ); if ( chunks.isComplete() ) { finishUploadInternal(); } break; case INVALID: LOGGER.warn( "Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch " + chunk.getFailCount() + "x :-(" ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); break; case NONE: LOGGER.warn( "Got hashCheckDone with result NONE" ); break; } // A block finished, see if we can queue a new one queueUnhashedChunk( false ); if ( localCopyManager != null ) { localCopyManager.trigger(); } } /** * Gets an unhashed chunk (if existent) and queues it for hashing */ protected void queueUnhashedChunk( boolean blocking ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) return; byte[] data; try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); return; } if ( data == null ) { LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" ); chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); return; } try { int flags = HashChecker.CALC_HASH; if ( blocking ) { flags |= HashChecker.BLOCKING; } if ( !hashChecker.queue( chunk, data, this, flags ) ) { chunks.markCompleted( chunk, false ); } } catch ( InterruptedException e ) { LOGGER.debug( "Interrupted while trying to queueUnhashedChunk" ); chunks.markCompleted( chunk, false ); Thread.currentThread().interrupt(); } } final synchronized void finishUploadInternal() { if ( state == TransferState.FINISHED ) { return; } try { if ( tmpFileHandle.length() < fileSize && chunks.lastChunkIsZero() ) { tmpFileHandle.setLength( fileSize ); } } catch ( IOException e ) { LOGGER.warn( "Cannot extend file size to " + fileSize ); } safeClose( tmpFileHandle ); if ( localCopyManager != null ) { localCopyManager.interrupt(); } if ( state != TransferState.WORKING ) { state = TransferState.ERROR; } else { state = TransferState.FINISHED; // Races... if ( !finishIncomingTransfer() ) { state = TransferState.ERROR; } } } protected HashChecker getHashChecker() { return hashChecker; } /* * */ /** * Override this and return true if the destination of this download has * still enough free space so we don't run into disk full errors. */ protected abstract boolean hasEnoughFreeSpace(); /** * This will be called once the download is complete. * The file handle used for writing has been closed before calling this. */ protected abstract boolean finishIncomingTransfer(); protected abstract void chunkStatusChanged( FileChunk chunk ); /** * Called when a chunk has been received -- no validation has taken place yet * @return whether we want to use the buffered data later on and it must not be written to */ protected boolean chunkReceived( FileChunk chunk, byte[] data ) { return false; } public boolean isServerSideCopyingEnabled() { return localCopyManager != null && !localCopyManager.isPaused(); } public void enableServerSideCopying( boolean serverSideCopying ) { if ( localCopyManager != null ) { localCopyManager.setPaused( !serverSideCopying ); } } }