package org.openslx.filetransfer.util; import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; import org.openslx.filetransfer.util.HashChecker.HashResult; import org.openslx.util.ThriftUtil; public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback { private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class ); /** * Remote peer is uploading, so on our end, we have Downloaders */ private List downloads = new ArrayList<>(); private final File tmpFileName; private final RandomAccessFile tmpFileHandle; private final ChunkList chunks; private TransferState state = TransferState.IDLE; private final long fileSize; private static final HashChecker hashChecker; /* * Overridable constants */ protected static int MAX_CONNECTIONS_PER_TRANSFER = 2; /** * Whether file is (still) writable. Used for the file transfer callbacks. */ private boolean fileWritable = true; static { long maxMem = Runtime.getRuntime().maxMemory(); if ( maxMem == Long.MAX_VALUE ) { maxMem = 512; } int hashQueueLen = (int) ( maxMem / 100 ); if ( hashQueueLen < 1 ) { hashQueueLen = 1; } else if ( hashQueueLen > 6 ) { hashQueueLen = 6; } HashChecker hc; try { hc = new HashChecker( "SHA-1", hashQueueLen ); } catch ( NoSuchAlgorithmException e ) { hc = null; } hashChecker = hc; } /*_*/ public IncomingTransferBase( String transferId, File absFilePath, long fileSize, List blockHashes ) throws FileNotFoundException { super( transferId ); this.fileSize = fileSize; // Prepare path tmpFileName = absFilePath; tmpFileName.getParentFile().mkdirs(); tmpFileHandle = new RandomAccessFile( absFilePath, "rw" ); try { if ( tmpFileHandle.length() > fileSize ) { tmpFileHandle.setLength( fileSize ); } } catch ( IOException e ) { LOGGER.debug( "File " + tmpFileName + " is too long and could not be truncated" ); } chunks = new ChunkList( fileSize, blockHashes ); } @Override public boolean isActive() { return state == TransferState.IDLE || state == TransferState.WORKING; } @Override public synchronized void cancel() { if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { state = TransferState.ERROR; } synchronized ( downloads ) { for ( Downloader download : downloads ) { download.cancel(); } } potentialFinishTime.set( 0 ); safeClose( tmpFileHandle ); } @Override public final int getActiveConnectionCount() { return downloads.size(); } public final boolean hashesEqual( List blockHashes ) { List existing = chunks.getAll(); if ( existing.size() != blockHashes.size() ) return false; List hashes = ThriftUtil.unwrapByteBufferList( blockHashes ); FileChunk first = existing.get( 0 ); if ( first == null || first.getSha1Sum() == null ) return false; Iterator it = hashes.iterator(); for ( FileChunk existingChunk : existing ) { byte[] testChunk = it.next(); if ( !Arrays.equals( testChunk, existingChunk.getSha1Sum() ) ) return false; } return true; } /* * Guettas for final/private fields */ public final long getFileSize() { return fileSize; } public final File getTmpFileName() { return tmpFileName; } public final TransferState getState() { return state; } public synchronized TransferStatus getStatus() { return new TransferStatus( chunks.getStatusArray(), getState() ); } public final ChunkList getChunks() { return chunks; } /** * It is possible to run a download where the remote peer didn't submit * the full list of block hashes yet, as it might be about to hash the file * while uploading. This method should be called to update the list * of block hashes. This is a cumulative call, so the list must contain * all hashes starting from block 0. * * @param hashList (incomplete) list of block hashes */ public void updateBlockHashList( List hashList ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state ); return; } if ( hashList == null ) { LOGGER.debug( this.getId() + ": Rejecting null block hash list" ); return; } chunks.updateSha1Sums( hashList ); if ( hashChecker == null ) return; for ( int cnt = 0; cnt < 3; ++cnt ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) break; byte[] data; try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { LOGGER.warn( "blockhash update: file too short, marking chunk as invalid" ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); continue; } catch ( Exception e ) { LOGGER.warn( "unexpected fail while loading chunk from disk", e ); } if ( data == null ) { LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" ); chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); continue; } try { if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop chunks.markCompleted( chunk, false ); break; } } catch ( InterruptedException e ) { LOGGER.debug( "updateBlockHashList got interrupted" ); chunks.markCompleted( chunk, false ); Thread.currentThread().interrupt(); return; } } } private byte[] loadChunkFromFile( FileChunk chunk ) throws EOFException { synchronized ( tmpFileHandle ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) return null; try { tmpFileHandle.seek( chunk.range.startOffset ); byte[] buffer = new byte[ chunk.range.getLength() ]; tmpFileHandle.readFully( buffer ); return buffer; } catch ( EOFException e ) { throw e; } catch ( IOException e ) { LOGGER.error( "Could not read chunk " + chunk.getChunkIndex() + " of File " + getTmpFileName().toString(), e ); return null; } } } /** * Callback class for an instance of the Downloader, which supplies * the Downloader with wanted file ranges, and handles incoming data. */ private class CbHandler implements WantRangeCallback, DataReceivedCallback { /** * The current chunk being transfered. */ private FileChunk currentChunk = null; /** * Current buffer to receive to */ private byte[] buffer = new byte[ FileChunk.CHUNK_SIZE ]; /** * Downloader object */ private final Downloader downloader; private CbHandler( Downloader downloader ) { this.downloader = downloader; } @Override public boolean dataReceived( long fileOffset, int dataLength, byte[] data ) { if ( currentChunk == null ) throw new IllegalStateException( "dataReceived without current chunk" ); if ( !currentChunk.range.contains( fileOffset, fileOffset + dataLength ) ) throw new IllegalStateException( "dataReceived with file data out of range" ); System.arraycopy( data, 0, buffer, (int) ( fileOffset - currentChunk.range.startOffset ), dataLength ); return fileWritable; } @Override public FileRange get() { boolean needNewBuffer = false; if ( currentChunk != null ) { needNewBuffer = chunkReceived( currentChunk, buffer ); if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); needNewBuffer = true; } catch ( InterruptedException e ) { chunks.markCompleted( currentChunk, false ); currentChunk = null; Thread.currentThread().interrupt(); return null; } } else { // We have no hash checker or the hash for the current chunk is unknown - flush to disk writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); chunks.markCompleted( currentChunk, false ); chunkStatusChanged( currentChunk ); } if ( needNewBuffer ) { try { buffer = new byte[ buffer.length ]; } catch ( OutOfMemoryError e ) { // Usually catching OOM errors is a bad idea, but it's quite safe here as // we know exactly where it happened, no hidden sub-calls through 20 objects. // The most likely cause here is that the hash checker/disk cannot keep up // writing out completed chunks, so we just sleep a bit and try again. If it still // fails, we exit completely. try { Thread.sleep( 6000 ); } catch ( InterruptedException e1 ) { Thread.currentThread().interrupt(); return null; } // Might raise OOM again, but THIS TIME I MEAN IT try { buffer = new byte[ buffer.length ]; } catch ( OutOfMemoryError e2 ) { LOGGER.warn( "Out of JVM memory - aborting incoming " + IncomingTransferBase.this.getId() ); downloader.sendErrorCode( "Out of RAM" ); cancel(); } } } currentChunk = null; } // Get next missing chunk try { currentChunk = chunks.getMissing(); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); cancel(); return null; } if ( currentChunk == null ) { return null; // No more chunks, returning null tells the Downloader we're done. } // Check remaining disk space and abort if it's too low if ( !hasEnoughFreeSpace() ) { downloader.sendErrorCode( "Out of disk space" ); LOGGER.error( "Out of space: Cancelling upload of " + getTmpFileName().getAbsolutePath() ); cancel(); return null; } if ( state == TransferState.IDLE ) { state = TransferState.WORKING; } return currentChunk.range; } } public boolean addConnection( final Downloader connection, ExecutorService pool ) { if ( state == TransferState.FINISHED ) { handleIncomingWhenFinished( connection, pool ); return true; } if ( state == TransferState.ERROR ) return false; synchronized ( downloads ) { if ( downloads.size() >= MAX_CONNECTIONS_PER_TRANSFER ) return false; downloads.add( connection ); } try { pool.execute( new Runnable() { @Override public void run() { CbHandler cbh = new CbHandler( connection ); if ( connection.download( cbh, cbh ) ) { connectFails.set( 0 ); } else { connectFails.incrementAndGet(); if ( cbh.currentChunk != null ) { // If the download failed and we have a current chunk, put it back into // the queue, so it will be handled again later... chunks.markFailed( cbh.currentChunk ); chunkStatusChanged( cbh.currentChunk ); } LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); } synchronized ( downloads ) { downloads.remove( connection ); } if ( chunks.isComplete() ) { finishUploadInternal(); } else { // Keep pumping unhashed chunks into the hasher queueUnhashedChunk( true ); } } } ); } catch ( Exception e ) { LOGGER.warn( "threadpool rejected the incoming file transfer", e ); synchronized ( downloads ) { downloads.remove( connection ); } return false; } if ( state == TransferState.IDLE ) { state = TransferState.WORKING; } return true; } private boolean handleIncomingWhenFinished( final Downloader connection, ExecutorService pool ) { try { pool.execute( new Runnable() { @Override public void run() { connection.sendDoneAndClose(); } } ); } catch ( Exception e ) { return false; } return true; } /** * Write some data to the local file. Thread safe so we can * have multiple concurrent connections. * * @param fileOffset * @param dataLength * @param data * @return */ private void writeFileData( long fileOffset, int dataLength, byte[] data ) { synchronized ( tmpFileHandle ) { if ( state != TransferState.WORKING ) throw new IllegalStateException( "Cannot write to file if state != WORKING (is " + state.toString() + ")" ); try { tmpFileHandle.seek( fileOffset ); tmpFileHandle.write( data, 0, dataLength ); } catch ( IOException e ) { LOGGER.error( "Cannot write to '" + getTmpFileName() + "'. Disk full, network storage error, bad permissions, ...?", e ); fileWritable = false; } } if ( !fileWritable ) { cancel(); } } @Override public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { LOGGER.debug( "hashCheckDone called in bad state " + state.name() ); return; } switch ( result ) { case FAILURE: LOGGER.warn( "Hash check of chunk " + chunk.toString() + " could not be executed. Assuming valid :-(" ); // Fall through case VALID: if ( chunk.isWrittenToDisk() ) { chunks.markCompleted( chunk, true ); } else { try { writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); chunks.markCompleted( chunk, true ); } catch ( Exception e ) { LOGGER.warn( "Cannot write to file after hash check", e ); chunks.markFailed( chunk ); } } chunkStatusChanged( chunk ); if ( chunks.isComplete() ) { finishUploadInternal(); } break; case INVALID: LOGGER.warn( "Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch " + chunk.getFailCount() + "x :-(" ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); break; } // A block finished, see if we can queue a new one queueUnhashedChunk( false ); } /** * Gets an unhashed chunk (if existent) and queues it for hashing */ protected void queueUnhashedChunk( boolean blocking ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) return; byte[] data; try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); return; } if ( data == null ) { LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" ); chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); return; } try { int flags = HashChecker.CALC_HASH; if ( blocking ) { flags |= HashChecker.BLOCKING; } if ( !hashChecker.queue( chunk, data, this, flags ) ) { chunks.markCompleted( chunk, false ); } } catch ( InterruptedException e ) { LOGGER.debug( "Interrupted while trying to queueUnhashedChunk" ); chunks.markCompleted( chunk, false ); Thread.currentThread().interrupt(); } } private synchronized void finishUploadInternal() { if ( state == TransferState.FINISHED ) { return; } safeClose( tmpFileHandle ); if ( state != TransferState.WORKING ) { state = TransferState.ERROR; } else { state = TransferState.FINISHED; // Races... if ( !finishIncomingTransfer() ) { state = TransferState.ERROR; } } } protected HashChecker getHashChecker() { return hashChecker; } /* * */ /** * Override this and return true if the destination of this download has * still enough free space so we don't run into disk full errors. */ protected abstract boolean hasEnoughFreeSpace(); /** * This will be called once the download is complete. * The file handle used for writing has been closed before calling this. */ protected abstract boolean finishIncomingTransfer(); protected abstract void chunkStatusChanged( FileChunk chunk ); /** * Called when a chunk has been received -- no validation has taken place yet * @return whether we want to use the buffered data later on and it must not be written to */ protected boolean chunkReceived( FileChunk chunk, byte[] data ) { return false; } }