summaryrefslogblamecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/ChunkList.java
blob: 91d6f1e98311be92ad2f9c429ebd53acdd2e9bd7 (plain) (tree)
1
2
3
4
5
6
7
8
9
10

                                      
                           

                           
                        
                             
                          

                            
                           

                               
                                                             
                                   
 

                      
 
                                                                                 

           




                                                                                                       

                                                
                                                                               





                                                                         
                                                                              
 
                                                                                                     
                                             
 



                                                        


                                                                                 
                                                                


                                                                               
                                                                                             



                                                                                                       

                                                                                                    

                                       
                                                                                               
           
                                                                       

                              
                                  


                                                        
                                            







                                                                                                                                       
                                 



                                                           

                                
                                


           





























                                                                                                                         


                                                   
                                       
           


                                                                              
                                    
                                                
                                          


                                                      
                                                          
                                                     
                                       



                         






































































                                                                                                                  

                                                       

                                                       
                                                   
                                                          






                                                                                                            








                                                                     


                                                          


           











                                                                                                                  
                                                                                     















                                                                                               

















                                                                                                   



                                                                          
                                                                                          

                                                   
                                                                                                          
                                                                                                   

                               
                                                                                                                              

                                        









                                                                                


                                                         
                                                                                                      
                                                                                                   


                                                                                
                                                   
                                            
                                 


                                     
           


















                                                                                                                                               





                                                                                                         

                                                


























                                                                            
                                      
































                                                               

         









                                             










                                                                  











































                                                                                                          











                                                                                                          
 
package org.openslx.filetransfer.util;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.CRC32;

import org.apache.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.util.ThriftUtil;

public class ChunkList
{

	private static final Logger LOGGER = Logger.getLogger( ChunkList.class );

	/**
	 * Here we keep a list of all chunks in the proper order, in case we quickly need to access one
	 */
	private final List<FileChunk> allChunks;

	/**
	 * Chunks that are missing from the file
	 */
	private final LinkedList<FileChunk> missingChunks = new LinkedList<>();

	/**
	 * Chunks that are currently being uploaded or hash-checked
	 */
	private final List<FileChunk> pendingChunks = new LinkedList<>();

	private final List<FileChunk> completeChunks = new ArrayList<>( 100 );

	// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying, 5 = hashing
	private final ByteBuffer statusArray;

	/**
	 * True if at least one block has a checksum set
	 */
	private boolean hasChecksum = false;

	// Do we need to keep valid chunks, or chunks that failed too many times?

	public ChunkList( long fileSize, List<byte[]> sha1Sums )
	{
		FileChunk.createChunkList( missingChunks, fileSize, sha1Sums );
		statusArray = ByteBuffer.allocate( missingChunks.size() );
		allChunks = Collections.unmodifiableList( new ArrayList<>( missingChunks ) );
	}

	/**
	 * Update the sha1sums of all chunks. This is meant to be used if you passed an incomplete list
	 * before (with null elements), so you don't have to hash the whole file before starting the
	 * upload, but periodically update it while the upload is running.
	 * 
	 * @param sha1Sums list of sums
	 * @return lowest index of chunk that didn't have a sha1sum before, -1 if no new ones  
	 */
	public synchronized int updateSha1Sums( List<byte[]> sha1Sums )
	{
		int index = 0;
		int firstNew = -1;
		for ( byte[] sum : sha1Sums ) {
			if ( index >= allChunks.size() )
				break;
			if ( sum != null ) {
				FileChunk chunk = allChunks.get( index );
				if ( chunk.setSha1Sum( sum ) ) {
					if ( firstNew == -1 ) {
						firstNew = index;
					}
					if ( chunk.status == ChunkStatus.MISSING && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, sum ) ) {
						markMissingAsComplete( index );
					}
				}
				if ( !hasChecksum ) {
					hasChecksum = true;
				}
			}
			index++;
		}
		return firstNew;
	}

	/**
	 * Get CRC32 list in DNBD3 format. All checksums are little
	 * endian and prefixed by the crc32 sum of the list itself.
	 */
	public synchronized byte[] getDnbd3Crc32List() throws IOException
	{
		byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master
		long nextChunkOffset = 0;
		int nextCrcArrayPos = 4;
		for ( FileChunk c : allChunks ) {
			if ( c.crc32 == null ) {
				throw new IllegalStateException( "Called on ChunkList that doesn't have crc32 enabled" );
			}
			if ( c.range.startOffset != nextChunkOffset ) {
				throw new IllegalStateException( "Chunk list is not in order or has wrong chunk size" );
			}
			nextChunkOffset += FileChunk.CHUNK_SIZE;
			c.getCrc32Le( buffer, nextCrcArrayPos );
			nextCrcArrayPos += 4;
		}
		CRC32 masterCrc = new CRC32();
		masterCrc.update( buffer, 4, buffer.length - 4 );
		int value = (int)masterCrc.getValue();
		buffer[3] = (byte) ( value >>> 24 );
		buffer[2] = (byte) ( value >>> 16 );
		buffer[1] = (byte) ( value >>> 8 );
		buffer[0] = (byte)value;
		return buffer;
	}

	/**
	 * Get a missing chunk, marking it pending.
	 * 
	 * @return chunk marked as missing
	 * @throws InterruptedException
	 */
	public synchronized FileChunk getMissing() throws InterruptedException
	{
		if ( missingChunks.isEmpty() && pendingChunks.isEmpty() )
			return null;
		if ( missingChunks.isEmpty() ) {
			this.wait( 6000 );
			if ( missingChunks.isEmpty() )
				return null;
		}
		FileChunk c = missingChunks.removeFirst();
		c.setStatus( ChunkStatus.UPLOADING );
		pendingChunks.add( c );
		return c;
	}

	/**
	 * Returns true if this list contains a chunk with state MISSING,
	 * which means the chunk doesn't have a sha1 known to exist in
	 * another image.
	 * @return
	 */
	public synchronized boolean hasLocallyMissingChunk()
	{
		return !missingChunks.isEmpty() && missingChunks.peekFirst().status == ChunkStatus.MISSING;
	}

	/**
	 * Get a chunk that is marked as candidate for copying.
	 * Returns null if none are available.
	 */
	public synchronized FileChunk getCopyCandidate()
	{
		if ( missingChunks.isEmpty() )
			return null;
		FileChunk last = missingChunks.removeLast();
		if ( last.status != ChunkStatus.QUEUED_FOR_COPY ) {
			// Put back
			missingChunks.add( last );
			return null;
		}
		// Is a candidate
		last.setStatus( ChunkStatus.COPYING );
		pendingChunks.add( last );
		return last;
	}

	/**
	 * Mark the given chunks for potential local copying instead of receiving them
	 * from peer.
	 * @param firstNew 
	 * @param sources
	 */
	public synchronized void markLocalCopyCandidates( List<ChunkSource> sources )
	{
		for ( ChunkSource src : sources ) {
			try {
				if ( src.sourceCandidates.isEmpty() )
					continue;
				List<FileChunk> append = null;
				for ( Iterator<FileChunk> it = missingChunks.iterator(); it.hasNext(); ) {
					FileChunk chunk = it.next();
					if ( !Arrays.equals( chunk.sha1sum, src.sha1sum ) )
						continue;
					if ( chunk.status == ChunkStatus.QUEUED_FOR_COPY )
						continue;
					// Bingo
					if ( append == null ) {
						append = new ArrayList<>( 20 );
					}
					it.remove();
					chunk.setStatus( ChunkStatus.QUEUED_FOR_COPY );
					chunk.setSource( src );
					append.add( chunk );
				}
				if ( append != null ) {
					// Move all the chunks queued for copying to the end of the list, so when
					// we getMissing() a chunk for upload from client, these ones would only
					// come last, in case reading from storage and writing back is really slow
					missingChunks.addAll( append );
				}
			} catch ( Exception e ) {
				LOGGER.warn( "chunk clone list if messed up", e );
			}
		}
	}

	/**
	 * Get the block status as byte representation.
	 */
	public synchronized ByteBuffer getStatusArray()
	{
		byte[] array = statusArray.array();
		for ( int i = 0; i < array.length; ++i ) {
			FileChunk chunk = allChunks.get( i );
			ChunkStatus status = chunk.getStatus();
			if ( hasChecksum && status == ChunkStatus.COMPLETE && chunk.getSha1Sum() == null ) {
				array[i] = ChunkStatus.HASHING.val;
			} else {
				array[i] = chunk.getStatus().val;
			}
		}
		return statusArray;
	}

	/**
	 * Get completed chunks as list
	 * 
	 * @return List containing all successfully transfered chunks
	 */
	public synchronized List<FileChunk> getCompleted()
	{
		return new ArrayList<>( completeChunks );
	}

	/**
	 * Set status of blocks according to given "ismissing" list. Intended to be called
	 * right after creating the list, in case we have a local file already and want to
	 * resume downloading.
	 */
	public synchronized void resumeFromStatusList( List<Boolean> statusList, long fileLength )
	{
		if ( !completeChunks.isEmpty() || !pendingChunks.isEmpty() ) {
			LOGGER.warn( "Inconsistent state: resume called when not all chunks are marked missing" );
		}
		int index = 0;
		for ( Boolean missing : statusList ) {
			FileChunk chunk = allChunks.get( index );
			if ( fileLength != -1 && fileLength < chunk.range.endOffset )
				break; // Stop, file is shorter than end of this chunk
			if ( missingChunks.remove( chunk ) || pendingChunks.remove( chunk ) ) {
				completeChunks.add( chunk );
			}
			if ( missing ) {
				// Trigger hashing
				chunk.setStatus( ChunkStatus.HASHING );
			} else {
				// Assume complete
				chunk.setStatus( ChunkStatus.COMPLETE );
			}
			index++;
		}
	}

	/**
	 * Get a chunk that is marked complete, has a sha1 hash, but has not been hash-checked yet.
	 * 
	 * @return chunk
	 */
	public synchronized FileChunk getUnhashedComplete()
	{
		for ( Iterator<FileChunk> it = completeChunks.iterator(); it.hasNext(); ) {
			FileChunk chunk = it.next();
			if ( chunk.sha1sum != null && chunk.status == ChunkStatus.HASHING ) {
				it.remove();
				pendingChunks.add( chunk );
				return chunk;
			}
		}
		return null;
	}

	/**
	 * Mark a chunk currently transferring as successfully transfered.
	 * 
	 * @param c The chunk in question
	 */
	public synchronized void markCompleted( FileChunk c, boolean hashCheckSuccessful )
	{
		if ( !pendingChunks.remove( c ) ) {
			LOGGER.warn( "Inconsistent state: markSuccessful called for Chunk " + c.toString()
					+ ", but chunk is not marked as currently transferring!" );
			return;
		}
		c.setStatus( ( hashCheckSuccessful || c.getSha1Sum() == null ) ? ChunkStatus.COMPLETE : ChunkStatus.HASHING );
		completeChunks.add( c );
		this.notifyAll();
	}

	/**
	 * Mark a chunk currently transferring or being hash checked as failed
	 * transfer. This increases its fail count and re-adds it to the list of
	 * missing chunks.
	 * 
	 * @param c The chunk in question
	 * @return Number of times transfer of this chunk failed
	 */
	public synchronized int markFailed( FileChunk c )
	{
		if ( !pendingChunks.remove( c ) ) {
			LOGGER.warn( "Inconsistent state: markFailed called for Chunk " + c.toString()
					+ ", but chunk is not marked as currently transferring!" );
			return -1;
		}
		// Add as first element so it will be re-transmitted immediately
		c.setStatus( ChunkStatus.MISSING );
		missingChunks.addFirst( c );
		this.notifyAll();
		return c.incFailed();
	}

	/**
	 * Mark a missing chunk as complete.
	 */
	private synchronized boolean markMissingAsComplete( int index )
	{
		FileChunk chunk = allChunks.get( index );
		if ( completeChunks.contains( chunk ) )
			return true;
		if ( !missingChunks.remove( chunk ) ) {
			LOGGER.warn( "Inconsistent state: markMissingAsComplete called for chunk " + chunk.toString() + " (indexed as " + index
					+ ") which is not missing" );
			return false;
		}
		chunk.setStatus( ChunkStatus.COMPLETE );
		completeChunks.add( chunk );
		this.notifyAll();
		return true;
	}

	/**
	 * Check if all blocks in this list are marked as successfully transfered. If a complete chunk is
	 * marked as "hashing", or if there are some complete chunks without a sha1sum and some with a
	 * sha1sum, the transfer is considered incomplete.
	 * 
	 * @return true iff transfer is complete
	 */
	public synchronized boolean isComplete()
	{
		if ( !missingChunks.isEmpty() || !pendingChunks.isEmpty() )
			return false;
		boolean sawWithHash = false;
		for ( FileChunk chunk : completeChunks ) {
			if ( chunk.status == ChunkStatus.HASHING )
				return false;
			if ( chunk.sha1sum != null ) {
				sawWithHash = true;
			} else if ( chunk.sha1sum == null && sawWithHash ) {
				return false;
			}
		}
		return true;
	}

	@Override
	public synchronized String toString()
	{
		StringBuilder sb = new StringBuilder();
		sb.append( '{' );
		for ( FileChunk chunk : allChunks ) {
			sb.append( '[' );
			sb.append( chunk.getChunkIndex() );
			if ( chunk.getSha1Sum() != null )
				sb.append( '+' );
			////
			switch ( chunk.status ) {
			case COMPLETE:
				sb.append( 'C' );
				break;
			case COPYING:
				sb.append( '>' );
				break;
			case HASHING:
				sb.append( 'H' );
				break;
			case MISSING:
				sb.append( 'M' );
				break;
			case QUEUED_FOR_COPY:
				sb.append( 'Q' );
				break;
			case UPLOADING:
				sb.append( 'P' );
				break;
			default:
				sb.append( '?' );
				break;

			}
			sb.append( '|' ); ////////////
			if ( missingChunks.contains( chunk ) )
				sb.append( 'M' );
			if ( pendingChunks.contains( chunk ) )
				sb.append( 'P' );
			if ( completeChunks.contains( chunk ) )
				sb.append( 'C' );
			sb.append( ']' );
		}
		sb.append( '}' );
		return sb.toString();
	}

	public synchronized boolean isEmpty()
	{
		return allChunks.isEmpty();
	}

	public List<FileChunk> getAll()
	{
		return allChunks;
	}

	public synchronized String getQueueName( FileChunk chunk )
	{
		if ( missingChunks.contains( chunk ) )
			return "missing";
		if ( pendingChunks.contains( chunk ) )
			return "pending";
		if ( completeChunks.contains( chunk ) )
			return "completed";
		return "NOQUEUE";
	}

	public static boolean hashListsEqualFcBb( List<FileChunk> one, List<ByteBuffer> two )
	{
		return hashListsEqualFcArray( one, ThriftUtil.unwrapByteBufferList( two ) );
	}

	public static boolean hashListsEqualFcArray( List<FileChunk> one, List<byte[]> two )
	{
		if ( one.size() != two.size() )
			return false;
		FileChunk first = one.get( 0 );
		if ( first == null || first.getSha1Sum() == null )
			return false;
		Iterator<byte[]> it = two.iterator();
		for ( FileChunk existingChunk : one ) {
			byte[] testChunk = it.next();
			if ( !Arrays.equals( testChunk, existingChunk.getSha1Sum() ) )
				return false;
		}
		return true;
	}

	public static boolean hashListsEqualBbBb( List<ByteBuffer> list1, List<ByteBuffer> list2 )
	{
		return hashListsEqualBbArray( list1, ThriftUtil.unwrapByteBufferList( list2 ) );
	}

	public static boolean hashListsEqualBbArray( List<ByteBuffer> bufferList, List<byte[]> arrayList )
	{
		return hashListsEqualArray( ThriftUtil.unwrapByteBufferList( bufferList ), arrayList );
	}

	public static boolean hashListsEqualArray( List<byte[]> list1, List<byte[]> list2 )
	{
		if ( list1.size() != list2.size() )
			return false;
		Iterator<byte[]> it1 = list1.iterator();
		Iterator<byte[]> it2 = list2.iterator();
		while ( it1.hasNext() && it2.hasNext() ) {
			if ( !Arrays.equals( it1.next(), it2.next() ) )
				return false;
		}
		return true;
	}

	/**
	 * Returns true if the last chunk is exactly 16MiB and all zeros
	 * @return
	 */
	public boolean lastChunkIsZero()
	{
		if ( allChunks.isEmpty() )
			return false;
		FileChunk chunk = allChunks.get( allChunks.size() - 1 );
		return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum );
	}

}