summaryrefslogblamecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
blob: cf593200a95aa6ccef8b7c6f215aa54619ea7f17 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                                              
                                                


                                                       
 

                                           
                                                            
                                         

                                               

                                                  

                             
                                                
 
                                                                                            
 



                                                                                                     
 
                                                                                                          










                                                                                          
                                          
                                       
                                            


                                                                                    

                                            
 

                                                    
                                                                                                           
                     


                                                                                                                      
                                
                 



                                                                                



                                         


                                                              
                                            
                 
                                

         

                           
                                                           
                     
                                                           

                                                                                   
                                              
                                 
                                            
                                     







                                                                                                 
                                                                           

                                                                         

                                                                                                                            
                                                                              
                                 

                                                                                   
                                              
                                 





                                                                                            
                                          

                                                                                                                                             



                                                                                   
                                                                              
                                                                                                                            
                                                                                                       


                                                                                

                                                                                                                  
                                                         
                                         
                                              



                                             

                                           


                 
                              
                                                     


                                  


                                                               
                                                 






                                                              
 

                                   
                                                                     


                                                                            
                 
         
 
           
                                                                






                                                                                 








                                                                         
                         

                                                                       
                                
                                                                                   
                         





                                                                                                                        
                                 
                         


                                                                                                                                                  
                 







                                                                                    
                                 


                                                                         
                         
                                                  
                                                           




                                                                                       



















                                                                                        

         




                                                                
                                                           
                                         










                                                                                                                                       

                                                                                                           
                                    
                     
                                                                             
                                                                                                                          
                                                                                



                                                                                                             
                                       


                                                                                            
                 
                            

         
 
package org.openslx.dozmod.filetransfer;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
import org.openslx.util.Util;

public class AsyncHashGenerator extends Thread {

	private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);

	private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1,
			Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
			10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2),
			new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy());

	private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() {
		@Override
		protected MessageDigest initialValue() {
			try {
				return MessageDigest.getInstance("SHA-1");
			} catch (NoSuchAlgorithmException e) {
				LOGGER.warn("No SHA-1 MD available. Cannot hash file", e);
				return null;
			}
		}
	};
	
	private String uploadToken = null;
	private int finishedChunks = 0;
	private final RandomAccessFile file;
	private final List<ByteBuffer> chunkHashes;
	private final List<FileChunk> chunkList;
	private long nextReadMsg, nextDoneMsg, nextSendingMsg; // for debug spam :-(
	private boolean readingDone = false;
	private boolean hashingDone = false;

	private volatile boolean isCanceled = false;

	public AsyncHashGenerator(File uploadFile) throws FileNotFoundException, NoSuchAlgorithmException {
		try {
			file = new RandomAccessFile(uploadFile, "r");
		} catch (FileNotFoundException e) {
			LOGGER.warn("Could not open file for hash-checking. Will not send checksums to satellite", e);
			throw e;
		}
		LOGGER.debug("Opened file for hashing");
		chunkList = new ArrayList<>();
		FileChunk.createChunkList(chunkList, uploadFile.length(), null);
		chunkHashes = new ArrayList<>(chunkList.size());
		setDaemon(true);
		setName("HashGenerator");
	}

	public void setUploadToken(String token) {
		if (!isCanceled && this.uploadToken == null) {
			this.uploadToken = token;
			submitHashes(false);
		}
		cleanupIfDone();
	}

	@Override
	public void run() {
		LOGGER.debug("Started hash reader worker");
		try {
			for (FileChunk chunk : chunkList) {
				if (isCanceled) {
					LOGGER.debug("Cancelled chunk reader (1)");
					break;
				}
				Block block;
				try {
					byte[] buffer;
					try {
						buffer = new byte[chunk.range.getLength()];
					} catch (OutOfMemoryError e) {
						LOGGER.info("Low memory - slowing down hashing");
						Util.sleep(5000);
						continue;
					}
					file.seek(chunk.range.startOffset);
					file.readFully(buffer);
					block = new Block(chunk, buffer);
				} catch (IOException e) {
					LOGGER.warn("Could not read file chunk " + chunk.getChunkIndex() + ", skipping", e);
					block = new Block(chunk, new byte[0]);
				}
				if (isCanceled) {
					LOGGER.debug("Cancelled chunk reader (2)");
					break;
				}
				//
				if (System.currentTimeMillis() > nextReadMsg) {
					nextReadMsg = System.currentTimeMillis() + 30000;
					LOGGER.debug("Read chunk " + chunk.getChunkIndex());
				}
				//
				for (;;) {
					if (HASH_WORK_POOL.isTerminating() || HASH_WORK_POOL.isTerminated() || HASH_WORK_POOL.isShutdown()) {
						LOGGER.warn("Aborting current hash job - pool has shut down");
						Thread.currentThread().interrupt();
						return;
					}
					try {
						HASH_WORK_POOL.execute(block);
						// Don't hash too furiously in the background if the upload didn't start yet
						if (uploadToken == null && chunk.getChunkIndex() > 4) {
							Util.sleep(200);
						}
					} catch (RejectedExecutionException e) {
						LOGGER.warn("Hash pool worker rejected a hash job!? Retrying...");
						Util.sleep(1000);
						continue;
					}
					break;
				}
			}
		} finally {
			Util.safeClose(file);
			readingDone = true;
			cleanupIfDone();
		}
	}

	public void cancel() {
		LOGGER.debug("Cancelled externally");
		isCanceled = true;
	}

	/**
	 * Worker for hashing chunk. Processed via thread pool.
	 */
	private class Block implements Runnable {
		public final FileChunk chunk;
		public final byte[] buffer;

		public Block(FileChunk chunk, byte[] buffer) {
			this.chunk = chunk;
			this.buffer = buffer;
		}

		@Override
		public void run() {
			MessageDigest digester = SHA1_DIGESTER.get();
			digester.update(buffer, 0, chunk.range.getLength());
			byte[] hash = digester.digest();
			hashDone(chunk, hash);
		}
	}

	/**
	 * Called by worker thread when a chunk has been hashed.
	 * This means this method is not running in the currentAsyncHashGenerator
	 * thread but one of the workers.
	 * 
	 * @param chunk
	 * @param hash
	 */
	private void hashDone(FileChunk chunk, byte[] hash) {
		int chunkIndex = chunk.getChunkIndex();
		boolean wasLastChunk = false;
		if (System.currentTimeMillis() > nextDoneMsg) {
			nextDoneMsg = System.currentTimeMillis() + 30000;
			LOGGER.debug("Done hashing chunk " + chunkIndex);
		}
		synchronized (chunkHashes) {
			while (chunkHashes.size() < chunkIndex) {
				chunkHashes.add(null);
			}
			if (chunkHashes.size() == chunkIndex) {
				chunkHashes.add(ByteBuffer.wrap(hash));
			} else {
				chunkHashes.set(chunkIndex, ByteBuffer.wrap(hash));
			}
			if (chunkIndex == finishedChunks) {
				while (finishedChunks < chunkHashes.size() && chunkHashes.get(finishedChunks) != null) {
					finishedChunks++;
					if (finishedChunks == chunkList.size()) {
						wasLastChunk = true;
					}
				}
			}
			if (chunkIndex + 1 == chunkList.size()) {
				LOGGER.debug("Hashed last chunk #" + chunkIndex + ", total=" + chunkList.size() + ", finished=" + finishedChunks);
			}
		}
		if (wasLastChunk) {
			// Last chunk was hashed - make sure list gets to the server
			// Try up to 10 times
			LOGGER.debug("Hashing done");
			for (int i = 0; i < 10; ++i) {
				if (submitHashes(true)) {
					LOGGER.debug("Hashes sent to server");
					break;
				}
				LOGGER.debug("Sending hashes failed...");
				if (!Util.sleep(2000))
					break; // Interrupted
			}
		} else if (chunkIndex % 20 == 0) {
			// Mid-hashing - update server side
			if (!submitHashes(false)) {
				LOGGER.warn("Server rejected partial block hash list");
				isCanceled = true;
			}
		}
		if (wasLastChunk) {
			hashingDone = true;
			cleanupIfDone();
		}
	}

	/**
	 * Drop references to all the chunk metadata - in case someone is still holding
	 * a reference to this class, at least we will not prevent this stuff from being
	 * garbage collected.
	 */
	private void cleanupIfDone() {
		if (uploadToken == null && !isCanceled)
			return;
		if (!readingDone)
			return;
		if (!hashingDone && !isCanceled)
			return;
		chunkHashes.clear();
		chunkList.clear();
	}

	/**
	 * Submit current list of hashes.
	 * 
	 * @return false if the token is not known to the server
	 */
	private boolean submitHashes(boolean mustSucceed) {
		List<ByteBuffer> subList;
		boolean d;
		synchronized (chunkHashes) {
			subList = new ArrayList<>( chunkHashes.subList(0, finishedChunks) );
			d = (finishedChunks == chunkList.size());
		}
		if (!d) {
			d = System.currentTimeMillis() > nextSendingMsg;
		}
		if (d) {
			nextSendingMsg = System.currentTimeMillis() + 30000;
			LOGGER.debug("Preparing to send hash list to server (" + subList.size() + " / " + (uploadToken != null) + ")");
		}
		if (uploadToken == null || subList.isEmpty()) // No token yet, cannot submit, or empty list
			return true;
		try {
			if (d) LOGGER.debug("Making updateBlockHashes call");
			ThriftManager.getSatClient().updateBlockHashes(uploadToken, subList, Session.getSatelliteToken());
			if (d) LOGGER.debug("updateBlockHashes call succeeded");
		} catch (TInvalidTokenException e) {
			LOGGER.warn("Cannot send hashList to satellite: Sat claims uploadToken is invalid!");
			isCanceled = true;
			return false;
		} catch (Exception e) {
			LOGGER.warn("Unknown exception when submitting hashList to sat", e);
			if (mustSucceed)
				return false;
		}
		return true;
	}

}