summaryrefslogblamecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
blob: e427d0649e3c1f259f36a1db418b8b1f66cdb52d (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                                              
                                                


                                                       
 

                                           

                                                            
                                         

                                               

                                                  

                                        

                             
                                                
 
                                                                                            
 















                                                                                                            
                                          
                                       

                                                   
                                           
 

                                                    
                                                                                                           
                     


                                                                                                                      
                                
                 






                                                                           


                                                              
                                            


                 

                           
                     
                                                      

                                                                                   
                                              
                                 
                                            
                                     
                                                                                          
                                                                           

                                                                         

                                                                                                                            
                                                                              
                                 

                                                                                   
                                              
                                 













                                                                                                                                
                                         
                                              






                                             
                              
                                                     


                                  
                                                 






                                                              
 





                                                                            
                 
         
 

















                                                                                   
                         



                                                                                                                        
                         
                 






                                                                                            
                                                      
                                         






                                                                                 
                                         








                                                                                               
                                                                  

                                         

                           

         




                                                                
                                                           
                                         
                                            
                                                                                            

                                                                                                           
                                    









                                                                                                                          
                 
                            

         
 
package org.openslx.dozmod.filetransfer;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
import org.openslx.util.Util;

public class AsyncHashGenerator extends Thread {

	private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);

	private static final ThreadPoolExecutor pool = new GrowingThreadPoolExecutor(1, Runtime.getRuntime()
			.availableProcessors(), 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(4),
			new PrioThreadFactory("HashGen"));

	private static final ThreadLocal<MessageDigest> sha1 = new ThreadLocal<MessageDigest>() {
		@Override
		protected MessageDigest initialValue() {
			try {
				return MessageDigest.getInstance("SHA-1");
			} catch (NoSuchAlgorithmException e) {
				LOGGER.warn("No SHA-1 MD available. Cannot hash file", e);
				return null;
			}
		}
	};
	
	private String uploadToken = null;
	private int finishedBlocks = 0;
	private final RandomAccessFile file;
	private final List<ByteBuffer> blockHashes;
	private final List<FileChunk> list;

	private volatile boolean isCanceled = false;

	public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException {
		try {
			file = new RandomAccessFile(uploadFile, "r");
		} catch (FileNotFoundException e) {
			LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e);
			throw e;
		}
		list = new ArrayList<>();
		FileChunk.createChunkList(list, uploadFile.length(), null);
		blockHashes = new ArrayList<>(list.size());
		setDaemon(true);
		setName("HashGenerator");
	}

	public void setUploadToken(String token) {
		if (!isCanceled && this.uploadToken == null) {
			this.uploadToken = token;
			submitHashes(false);
		}
	}

	@Override
	public void run() {
		try {
			for (FileChunk chunk : list) {
				if (isCanceled) {
					LOGGER.debug("Cancelled chunk reader (1)");
					break;
				}
				Block block;
				try {
					byte[] buffer = new byte[chunk.range.getLength()];
					file.seek(chunk.range.startOffset);
					file.readFully(buffer);
					block = new Block(chunk, buffer);
				} catch (IOException e) {
					LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e);
					block = new Block(chunk, new byte[0]);
				}
				if (isCanceled) {
					LOGGER.debug("Cancelled chunk reader (2)");
					break;
				}
				for (;;) {
					if (pool.isTerminating() || pool.isTerminated()) {
						Thread.currentThread().interrupt();
						return;
					}
					try {
						pool.execute(block);
						// Don't hash too furiously in the background if the upload didn't start yet
						if (uploadToken == null && chunk.range.startOffset > FileChunk.CHUNK_SIZE * 4) {
							Util.sleep(200);
						}
					} catch (RejectedExecutionException e) {
						Util.sleep(100);
						continue;
					}
					break;
				}
			}
		} finally {
			Util.safeClose(file);
		}
	}

	public void cancel() {
		LOGGER.debug("Cancelled externally");
		isCanceled = true;
	}

	private class Block implements Runnable {
		public final FileChunk chunk;
		public final byte[] buffer;

		public Block(FileChunk chunk, byte[] buffer) {
			this.chunk = chunk;
			this.buffer = buffer;
		}

		@Override
		public void run() {
			MessageDigest digester = sha1.get();
			digester.update(buffer, 0, chunk.range.getLength());
			byte[] hash = digester.digest();
			hashDone(chunk, hash);
		}
	}

	/**
	 * Called by worker thread when a block has been hashed.
	 * This means this method is not running in the currentAsyncHashGenerator
	 * thread but one of the workers.
	 * 
	 * @param chunk
	 * @param hash
	 */
	private void hashDone(FileChunk chunk, byte[] hash) {
		int blockIndex = chunk.getChunkIndex();
		synchronized (blockHashes) {
			while (blockHashes.size() < blockIndex) {
				blockHashes.add(null);
			}
			if (blockHashes.size() == blockIndex) {
				blockHashes.add(ByteBuffer.wrap(hash));
			} else {
				blockHashes.set(blockIndex, ByteBuffer.wrap(hash));
			}
			if (blockIndex == finishedBlocks) {
				while (finishedBlocks < blockHashes.size() && blockHashes.get(finishedBlocks) != null) {
					finishedBlocks++;
				}
			}
		}
		if (blockIndex % 20 == 0 || finishedBlocks == list.size()) {
			if (blockIndex + 1 == list.size()) {
				// Last block was hashed - make sure list gets to the server
				LOGGER.debug("Hashing done");
				for (int i = 0; i < 10; ++i) {
					if (submitHashes(true)) {
						LOGGER.debug("Hashes sent to server");
						break;
					}
					LOGGER.debug("Sending hashes failed...");
					try {
						Thread.sleep(2000);
						continue;
					} catch (InterruptedException e) {
						interrupt();
						return;
					}
				}
				return;
			}
			// Mid-hashing - update server side
			QuickTimer.scheduleOnce(new Task() {
				@Override
				public void fire() {
					if (!submitHashes(false)) {
						LOGGER.warn("Server rejected block hash list");
						isCanceled = true;
					}
				}
			});
		}	
	}

	/**
	 * Submit current list of hashes.
	 * 
	 * @return false if the token is not known to the server
	 */
	private boolean submitHashes(boolean mustSucceed) {
		List<ByteBuffer> subList;
		synchronized (blockHashes) {
			subList = new ArrayList<>( blockHashes.subList(0, finishedBlocks) );
		}
		if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list
			return true;
		try {
			ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken());
		} catch (TInvalidTokenException e) {
			LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!");
			isCanceled = true;
			return false;
		} catch (TException e) {
			LOGGER.warn("Unknown exception when submitting hashList to sat", e);
			if (mustSucceed)
				return false;
		}
		return true;
	}

}