diff options
Diffstat (limited to 'src/main/java/org/openslx')
7 files changed, 220 insertions, 117 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index bd927b1..27f8e8c 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -92,7 +93,7 @@ public class ChunkList * Get CRC32 list in DNBD3 format. All checksums are little * endian and prefixed by the crc32 sum of the list itself. */ - public synchronized byte[] getDnbd3Crc32List() throws IOException + public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException { byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master long nextChunkOffset = 0; @@ -143,7 +144,6 @@ public class ChunkList * Returns true if this list contains a chunk with state MISSING, * which means the chunk doesn't have a sha1 known to exist in * another image. - * @return */ public synchronized boolean hasLocallyMissingChunk() { @@ -521,4 +521,21 @@ public class ChunkList return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum ); } + /** + * Write DNBD3 CRC32 list to given file. + * + * @throws IllegalStateException + * @throws IOException + */ + public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException + { + byte[] dnbd3Crc32List = null; + dnbd3Crc32List = getDnbd3Crc32List(); + if ( dnbd3Crc32List != null ) { + try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) { + fos.write( dnbd3Crc32List ); + } + } + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index 41bd05a..abbcd35 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -15,9 +15,11 @@ import org.apache.logging.log4j.Logger; public class HashChecker { public static final int BLOCKING = 1; - public static final int CALC_HASH = 2; + public static final int CHECK_SHA1 = 2; public static final int CALC_CRC32 = 4; - + public static final int CALC_SHA1 = 8; + public static final int NO_SLOW_WARN = 16; + private static final Logger LOGGER = LogManager.getLogger( HashChecker.class ); private final BlockingQueue<HashTask> queue; @@ -27,7 +29,7 @@ public class HashChecker private final String algorithm; private boolean invalid = false; - + private final int queueCapacity; public HashChecker( String algorithm ) throws NoSuchAlgorithmException @@ -97,11 +99,12 @@ public class HashChecker public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException { boolean blocking = ( flags & BLOCKING ) != 0; - boolean doHash = ( flags & CALC_HASH ) != 0; - boolean doCrc32 = ( flags & CALC_CRC32 ) != 0; - if ( doHash && chunk.getSha1Sum() == null ) + boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0; + boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0; + boolean calcSha1 = ( flags & CALC_SHA1 ) != 0; + if ( checkSha1 && chunk.getSha1Sum() == null ) throw new NullPointerException( "Chunk has no sha1 hash" ); - HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 ); + HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 ); synchronized ( threads ) { if ( invalid ) { execCallback( task, HashResult.FAILURE ); @@ -133,15 +136,17 @@ public class HashChecker } } } - if ( doHash ) { + if ( checkSha1 ) { chunk.setStatus( ChunkStatus.HASHING ); } if ( blocking ) { long pre = System.currentTimeMillis(); queue.put( task ); - long duration = System.currentTimeMillis() - pre; - if ( duration > 1000 ) { - LOGGER.warn( "HashChecker.queue() took " + duration + "ms" ); + if ( ( flags & NO_SLOW_WARN ) == 0 ) { + long duration = System.currentTimeMillis() - pre; + if ( duration > 1000 ) { + LOGGER.warn( "HashChecker.queue() took " + duration + "ms" ); + } } } else { if ( !queue.offer( task ) ) { @@ -158,7 +163,7 @@ public class HashChecker { return queue.size(); } - + public int getQueueCapacity() { return queueCapacity; @@ -208,15 +213,19 @@ public class HashChecker break; } HashResult result = HashResult.NONE; - if ( task.doHash ) { + if ( task.checkSha1 || task.calcSha1 ) { // Calculate digest - md.update( task.data, 0, task.chunk.range.getLength() ); - byte[] digest = md.digest(); - result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + md.update( task.data, 0, task.chunk.range.getLength() ); + byte[] digest = md.digest(); + if ( task.checkSha1 ) { + result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + } else { + task.chunk.setSha1Sum( digest ); + } } - if ( task.doCrc32 ) { - // Calculate CRC32 - task.chunk.calculateDnbd3Crc32( task.data ); + if ( task.calcCrc32 ) { + // Calculate CRC32 + task.chunk.calculateDnbd3Crc32( task.data ); } execCallback( task, result ); } @@ -240,16 +249,18 @@ public class HashChecker public final byte[] data; public final FileChunk chunk; public final HashCheckCallback callback; - public final boolean doHash; - public final boolean doCrc32; + public final boolean checkSha1; + public final boolean calcCrc32; + public final boolean calcSha1; - public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 ) + public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 ) { this.data = data; this.chunk = chunk; this.callback = callback; - this.doHash = doHash; - this.doCrc32 = doCrc32; + this.checkSha1 = checkSha1; + this.calcCrc32 = calcCrc32; + this.calcSha1 = calcSha1; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 8e68dc2..5cca7b8 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -252,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H continue; } try { - if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop + if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop chunks.markCompleted( chunk, false ); break; } @@ -435,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H InterruptedException passEx = null; if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 ); return true; } catch ( InterruptedException e ) { passEx = e; @@ -650,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return; } try { - int flags = HashChecker.CALC_HASH; + int flags = HashChecker.CHECK_SHA1; if ( blocking ) { flags |= HashChecker.BLOCKING; } @@ -686,7 +686,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } - protected HashChecker getHashChecker() + public static HashChecker getHashChecker() { return hashChecker; } diff --git a/src/main/java/org/openslx/util/AppUtil.java b/src/main/java/org/openslx/util/AppUtil.java index 340dc21..9a1e039 100644 --- a/src/main/java/org/openslx/util/AppUtil.java +++ b/src/main/java/org/openslx/util/AppUtil.java @@ -59,7 +59,9 @@ public class AppUtil jarFileStream = AppUtil.class.getProtectionDomain().getCodeSource().getLocation().openStream(); jarStream = new JarInputStream( jarFileStream ); final Manifest mf = jarStream.getManifest(); - manifestAttributes = mf.getMainAttributes(); + if ( mf != null ) { + manifestAttributes = mf.getMainAttributes(); + } } catch ( Exception e ) { LOGGER.warn( "Cannot read jar/manifest attributes", e ); } finally { diff --git a/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java new file mode 100644 index 0000000..005f45f --- /dev/null +++ b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java @@ -0,0 +1,116 @@ +package org.openslx.util; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Cascaded thread pool. A thread pool behaving mostly like a normal + * TPE, until it is saturated and would reject execution. In that case, + * it will try to run the job in a global, shared fallback thread pool, + * and only reject execution if this also fails. + * The reasoning is that you can define rather small thread pools for + * different jobs, without having to use particularly high maximumPoolSize + * for cases of sudden high load. If you have a dozen thread pools that can + * grow to hundreds of threads, worst case you suddenly have a thousand + * threads around using up memory and everything's messed up. Instead, + * use conservative values like 8 or 16 as the maximum size, and rely on + * the CascadedThreadPoolExecutor to take load spikes. So, even if multiple + * parts of your application suddenly are hit with an unexpectedly high + * load, the overall number of threads can be kept within reasonable bounds + * and OOM situations are less likely to occur. + * Also, in case some part of the application saturated the shared pool + * for extended periods of time, other "well behaving" parts of your + * application can still make progress with their small pools, in contrast + * to a design where everything in your application shares one giant + * thread pool directly. + */ +public class CascadedThreadPoolExecutor extends ThreadPoolExecutor +{ + + private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() { + @Override + public void rejectedExecution( Runnable r, ThreadPoolExecutor executor ) + { + FALLBACK_TPE.execute( r ); + } + }; + + private static final ThreadPoolExecutor FALLBACK_TPE = new ThreadPoolExecutor( 16, 128, + 1, TimeUnit.MINUTES, + new LinkedBlockingDeque<Runnable>( 4 ), + new PrioThreadFactory( "FallbackTP" ), + new AbortPolicy() ); + + static { + FALLBACK_TPE.allowCoreThreadTimeOut( true ); + } + + /** + * The RejectedExecutionHandler of this pool. We need to trigger this if the shared pool rejected + * the execution by throwing a RejectedExecutionException. + */ + private RejectedExecutionHandler customRejectionHandler = null; + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> queue, String threadNamePrefix ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, + new PrioThreadFactory( threadNamePrefix ), null ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, int queueSize, String threadNamePrefix ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, null, threadNamePrefix ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + int queueSize, ThreadFactory threadFactory ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, threadFactory, null ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + int queueSize, RejectedExecutionHandler handler, String threadNamePrefix ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, new PrioThreadFactory( threadNamePrefix ), + handler ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>( queueSize ), + threadFactory, handler ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler handler ) + { + // Only in super() call pass defaultHandler, not in this() calls! + super( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, + threadFactory, defaultHandler ); + this.customRejectionHandler = handler; + } + + @Override + public void execute( Runnable command ) + { + try { + super.execute( command ); + } catch ( RejectedExecutionException e ) { + if ( customRejectionHandler == null || ( customRejectionHandler.getClass().equals( AbortPolicy.class ) ) ) { + throw e; + } else { + customRejectionHandler.rejectedExecution( command, this ); + } + } + } + +} diff --git a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java b/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java deleted file mode 100644 index 7b0b2d9..0000000 --- a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.openslx.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * Grows to maximum pool size before queueing. See - * http://stackoverflow.com/a/20153234/2043481 - */ -public class GrowingThreadPoolExecutor extends ThreadPoolExecutor { - private int userSpecifiedCorePoolSize; - private int taskCount; - - /** - * The default rejected execution handler - */ - private static final RejectedExecutionHandler defaultHandler = - new AbortPolicy(); - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, BlockingQueue<Runnable> workQueue) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, defaultHandler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), - handler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - userSpecifiedCorePoolSize = corePoolSize; - } - - @Override - public void execute(Runnable runnable) { - synchronized (this) { - taskCount++; - setCorePoolSizeToTaskCountWithinBounds(); - } - super.execute(runnable); - } - - @Override - protected void afterExecute(Runnable runnable, Throwable throwable) { - super.afterExecute(runnable, throwable); - synchronized (this) { - taskCount--; - setCorePoolSizeToTaskCountWithinBounds(); - } - } - - private void setCorePoolSizeToTaskCountWithinBounds() { - int threads = taskCount; - if (threads < userSpecifiedCorePoolSize) - threads = userSpecifiedCorePoolSize; - if (threads > getMaximumPoolSize()) - threads = getMaximumPoolSize(); - super.setCorePoolSize(threads); - } - - public void setCorePoolSize(int corePoolSize) { - synchronized (this) { - userSpecifiedCorePoolSize = corePoolSize; - } - } - - @Override - public int getCorePoolSize() { - synchronized (this) { - return userSpecifiedCorePoolSize; - } - } -}
\ No newline at end of file diff --git a/src/main/java/org/openslx/util/Util.java b/src/main/java/org/openslx/util/Util.java index f475d01..e425083 100644 --- a/src/main/java/org/openslx/util/Util.java +++ b/src/main/java/org/openslx/util/Util.java @@ -74,7 +74,6 @@ public class Util * * @param value string representation to parse to an int * @param defaultValue fallback value if given string can't be parsed - * @return */ public static int parseInt( String value, int defaultValue ) { @@ -85,6 +84,23 @@ public class Util } } + /** + * Parse the given String as a base10 long. + * If the string does not represent a valid long, return the given + * default value. + * + * @param value string representation to parse to a long + * @param defaultValue fallback value if given string can't be parsed + */ + public static long parseLong( String value, long defaultValue ) + { + try { + return Long.parseLong( value ); + } catch ( Exception e ) { + return defaultValue; + } + } + public static void safeClose( AutoCloseable... closeable ) { for ( AutoCloseable c : closeable ) { @@ -118,9 +134,35 @@ public class Util } } + /** + * Number of seconds elapsed since 1970-01-01 UTC. + */ public static long unixTime() { return System.currentTimeMillis() / 1000; } + /** + * Monotonic tick count in milliseconds, not bound to RTC. + */ + public static long tickCount() + { + return System.nanoTime() / 1000_000; + } + + private static final String[] UNITS = new String[] { "B", "KB", "MB", "GB", "TB", "PB", "???" }; + + public static String formatBytes( double val ) + { + int unit = 0; + while ( Math.abs(val) > 1024 ) { + val /= 1024; + unit++; + } + if (unit >= UNITS.length) { + unit = UNITS.length - 1; + } + return String.format( "%.1f %s", val, UNITS[unit] ); + } + } |