summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx')
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java21
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java59
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java8
-rw-r--r--src/main/java/org/openslx/util/AppUtil.java4
-rw-r--r--src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java116
-rw-r--r--src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java85
-rw-r--r--src/main/java/org/openslx/util/Util.java44
7 files changed, 220 insertions, 117 deletions
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index bd927b1..27f8e8c 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -92,7 +93,7 @@ public class ChunkList
* Get CRC32 list in DNBD3 format. All checksums are little
* endian and prefixed by the crc32 sum of the list itself.
*/
- public synchronized byte[] getDnbd3Crc32List() throws IOException
+ public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException
{
byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master
long nextChunkOffset = 0;
@@ -143,7 +144,6 @@ public class ChunkList
* Returns true if this list contains a chunk with state MISSING,
* which means the chunk doesn't have a sha1 known to exist in
* another image.
- * @return
*/
public synchronized boolean hasLocallyMissingChunk()
{
@@ -521,4 +521,21 @@ public class ChunkList
return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum );
}
+ /**
+ * Write DNBD3 CRC32 list to given file.
+ *
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException
+ {
+ byte[] dnbd3Crc32List = null;
+ dnbd3Crc32List = getDnbd3Crc32List();
+ if ( dnbd3Crc32List != null ) {
+ try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) {
+ fos.write( dnbd3Crc32List );
+ }
+ }
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index 41bd05a..abbcd35 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -15,9 +15,11 @@ import org.apache.logging.log4j.Logger;
public class HashChecker
{
public static final int BLOCKING = 1;
- public static final int CALC_HASH = 2;
+ public static final int CHECK_SHA1 = 2;
public static final int CALC_CRC32 = 4;
-
+ public static final int CALC_SHA1 = 8;
+ public static final int NO_SLOW_WARN = 16;
+
private static final Logger LOGGER = LogManager.getLogger( HashChecker.class );
private final BlockingQueue<HashTask> queue;
@@ -27,7 +29,7 @@ public class HashChecker
private final String algorithm;
private boolean invalid = false;
-
+
private final int queueCapacity;
public HashChecker( String algorithm ) throws NoSuchAlgorithmException
@@ -97,11 +99,12 @@ public class HashChecker
public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException
{
boolean blocking = ( flags & BLOCKING ) != 0;
- boolean doHash = ( flags & CALC_HASH ) != 0;
- boolean doCrc32 = ( flags & CALC_CRC32 ) != 0;
- if ( doHash && chunk.getSha1Sum() == null )
+ boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0;
+ boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0;
+ boolean calcSha1 = ( flags & CALC_SHA1 ) != 0;
+ if ( checkSha1 && chunk.getSha1Sum() == null )
throw new NullPointerException( "Chunk has no sha1 hash" );
- HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 );
+ HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 );
synchronized ( threads ) {
if ( invalid ) {
execCallback( task, HashResult.FAILURE );
@@ -133,15 +136,17 @@ public class HashChecker
}
}
}
- if ( doHash ) {
+ if ( checkSha1 ) {
chunk.setStatus( ChunkStatus.HASHING );
}
if ( blocking ) {
long pre = System.currentTimeMillis();
queue.put( task );
- long duration = System.currentTimeMillis() - pre;
- if ( duration > 1000 ) {
- LOGGER.warn( "HashChecker.queue() took " + duration + "ms" );
+ if ( ( flags & NO_SLOW_WARN ) == 0 ) {
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 1000 ) {
+ LOGGER.warn( "HashChecker.queue() took " + duration + "ms" );
+ }
}
} else {
if ( !queue.offer( task ) ) {
@@ -158,7 +163,7 @@ public class HashChecker
{
return queue.size();
}
-
+
public int getQueueCapacity()
{
return queueCapacity;
@@ -208,15 +213,19 @@ public class HashChecker
break;
}
HashResult result = HashResult.NONE;
- if ( task.doHash ) {
+ if ( task.checkSha1 || task.calcSha1 ) {
// Calculate digest
- md.update( task.data, 0, task.chunk.range.getLength() );
- byte[] digest = md.digest();
- result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ md.update( task.data, 0, task.chunk.range.getLength() );
+ byte[] digest = md.digest();
+ if ( task.checkSha1 ) {
+ result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ } else {
+ task.chunk.setSha1Sum( digest );
+ }
}
- if ( task.doCrc32 ) {
- // Calculate CRC32
- task.chunk.calculateDnbd3Crc32( task.data );
+ if ( task.calcCrc32 ) {
+ // Calculate CRC32
+ task.chunk.calculateDnbd3Crc32( task.data );
}
execCallback( task, result );
}
@@ -240,16 +249,18 @@ public class HashChecker
public final byte[] data;
public final FileChunk chunk;
public final HashCheckCallback callback;
- public final boolean doHash;
- public final boolean doCrc32;
+ public final boolean checkSha1;
+ public final boolean calcCrc32;
+ public final boolean calcSha1;
- public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 )
+ public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 )
{
this.data = data;
this.chunk = chunk;
this.callback = callback;
- this.doHash = doHash;
- this.doCrc32 = doCrc32;
+ this.checkSha1 = checkSha1;
+ this.calcCrc32 = calcCrc32;
+ this.calcSha1 = calcSha1;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 8e68dc2..5cca7b8 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -252,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
continue;
}
try {
- if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop
+ if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop
chunks.markCompleted( chunk, false );
break;
}
@@ -435,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
InterruptedException passEx = null;
if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 );
return true;
} catch ( InterruptedException e ) {
passEx = e;
@@ -650,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return;
}
try {
- int flags = HashChecker.CALC_HASH;
+ int flags = HashChecker.CHECK_SHA1;
if ( blocking ) {
flags |= HashChecker.BLOCKING;
}
@@ -686,7 +686,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
- protected HashChecker getHashChecker()
+ public static HashChecker getHashChecker()
{
return hashChecker;
}
diff --git a/src/main/java/org/openslx/util/AppUtil.java b/src/main/java/org/openslx/util/AppUtil.java
index 340dc21..9a1e039 100644
--- a/src/main/java/org/openslx/util/AppUtil.java
+++ b/src/main/java/org/openslx/util/AppUtil.java
@@ -59,7 +59,9 @@ public class AppUtil
jarFileStream = AppUtil.class.getProtectionDomain().getCodeSource().getLocation().openStream();
jarStream = new JarInputStream( jarFileStream );
final Manifest mf = jarStream.getManifest();
- manifestAttributes = mf.getMainAttributes();
+ if ( mf != null ) {
+ manifestAttributes = mf.getMainAttributes();
+ }
} catch ( Exception e ) {
LOGGER.warn( "Cannot read jar/manifest attributes", e );
} finally {
diff --git a/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java
new file mode 100644
index 0000000..005f45f
--- /dev/null
+++ b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java
@@ -0,0 +1,116 @@
+package org.openslx.util;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cascaded thread pool. A thread pool behaving mostly like a normal
+ * TPE, until it is saturated and would reject execution. In that case,
+ * it will try to run the job in a global, shared fallback thread pool,
+ * and only reject execution if this also fails.
+ * The reasoning is that you can define rather small thread pools for
+ * different jobs, without having to use particularly high maximumPoolSize
+ * for cases of sudden high load. If you have a dozen thread pools that can
+ * grow to hundreds of threads, worst case you suddenly have a thousand
+ * threads around using up memory and everything's messed up. Instead,
+ * use conservative values like 8 or 16 as the maximum size, and rely on
+ * the CascadedThreadPoolExecutor to take load spikes. So, even if multiple
+ * parts of your application suddenly are hit with an unexpectedly high
+ * load, the overall number of threads can be kept within reasonable bounds
+ * and OOM situations are less likely to occur.
+ * Also, in case some part of the application saturated the shared pool
+ * for extended periods of time, other "well behaving" parts of your
+ * application can still make progress with their small pools, in contrast
+ * to a design where everything in your application shares one giant
+ * thread pool directly.
+ */
+public class CascadedThreadPoolExecutor extends ThreadPoolExecutor
+{
+
+ private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
+ {
+ FALLBACK_TPE.execute( r );
+ }
+ };
+
+ private static final ThreadPoolExecutor FALLBACK_TPE = new ThreadPoolExecutor( 16, 128,
+ 1, TimeUnit.MINUTES,
+ new LinkedBlockingDeque<Runnable>( 4 ),
+ new PrioThreadFactory( "FallbackTP" ),
+ new AbortPolicy() );
+
+ static {
+ FALLBACK_TPE.allowCoreThreadTimeOut( true );
+ }
+
+ /**
+ * The RejectedExecutionHandler of this pool. We need to trigger this if the shared pool rejected
+ * the execution by throwing a RejectedExecutionException.
+ */
+ private RejectedExecutionHandler customRejectionHandler = null;
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
+ TimeUnit unit, BlockingQueue<Runnable> queue, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
+ new PrioThreadFactory( threadNamePrefix ), null );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
+ TimeUnit unit, int queueSize, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, null, threadNamePrefix );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, ThreadFactory threadFactory )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, threadFactory, null );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, RejectedExecutionHandler handler, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, new PrioThreadFactory( threadNamePrefix ),
+ handler );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>( queueSize ),
+ threadFactory, handler );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler handler )
+ {
+ // Only in super() call pass defaultHandler, not in this() calls!
+ super( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
+ threadFactory, defaultHandler );
+ this.customRejectionHandler = handler;
+ }
+
+ @Override
+ public void execute( Runnable command )
+ {
+ try {
+ super.execute( command );
+ } catch ( RejectedExecutionException e ) {
+ if ( customRejectionHandler == null || ( customRejectionHandler.getClass().equals( AbortPolicy.class ) ) ) {
+ throw e;
+ } else {
+ customRejectionHandler.rejectedExecution( command, this );
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java b/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java
deleted file mode 100644
index 7b0b2d9..0000000
--- a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.openslx.util;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Grows to maximum pool size before queueing. See
- * http://stackoverflow.com/a/20153234/2043481
- */
-public class GrowingThreadPoolExecutor extends ThreadPoolExecutor {
- private int userSpecifiedCorePoolSize;
- private int taskCount;
-
- /**
- * The default rejected execution handler
- */
- private static final RejectedExecutionHandler defaultHandler =
- new AbortPolicy();
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
- TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, defaultHandler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(),
- handler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- userSpecifiedCorePoolSize = corePoolSize;
- }
-
- @Override
- public void execute(Runnable runnable) {
- synchronized (this) {
- taskCount++;
- setCorePoolSizeToTaskCountWithinBounds();
- }
- super.execute(runnable);
- }
-
- @Override
- protected void afterExecute(Runnable runnable, Throwable throwable) {
- super.afterExecute(runnable, throwable);
- synchronized (this) {
- taskCount--;
- setCorePoolSizeToTaskCountWithinBounds();
- }
- }
-
- private void setCorePoolSizeToTaskCountWithinBounds() {
- int threads = taskCount;
- if (threads < userSpecifiedCorePoolSize)
- threads = userSpecifiedCorePoolSize;
- if (threads > getMaximumPoolSize())
- threads = getMaximumPoolSize();
- super.setCorePoolSize(threads);
- }
-
- public void setCorePoolSize(int corePoolSize) {
- synchronized (this) {
- userSpecifiedCorePoolSize = corePoolSize;
- }
- }
-
- @Override
- public int getCorePoolSize() {
- synchronized (this) {
- return userSpecifiedCorePoolSize;
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/org/openslx/util/Util.java b/src/main/java/org/openslx/util/Util.java
index f475d01..e425083 100644
--- a/src/main/java/org/openslx/util/Util.java
+++ b/src/main/java/org/openslx/util/Util.java
@@ -74,7 +74,6 @@ public class Util
*
* @param value string representation to parse to an int
* @param defaultValue fallback value if given string can't be parsed
- * @return
*/
public static int parseInt( String value, int defaultValue )
{
@@ -85,6 +84,23 @@ public class Util
}
}
+ /**
+ * Parse the given String as a base10 long.
+ * If the string does not represent a valid long, return the given
+ * default value.
+ *
+ * @param value string representation to parse to a long
+ * @param defaultValue fallback value if given string can't be parsed
+ */
+ public static long parseLong( String value, long defaultValue )
+ {
+ try {
+ return Long.parseLong( value );
+ } catch ( Exception e ) {
+ return defaultValue;
+ }
+ }
+
public static void safeClose( AutoCloseable... closeable )
{
for ( AutoCloseable c : closeable ) {
@@ -118,9 +134,35 @@ public class Util
}
}
+ /**
+ * Number of seconds elapsed since 1970-01-01 UTC.
+ */
public static long unixTime()
{
return System.currentTimeMillis() / 1000;
}
+ /**
+ * Monotonic tick count in milliseconds, not bound to RTC.
+ */
+ public static long tickCount()
+ {
+ return System.nanoTime() / 1000_000;
+ }
+
+ private static final String[] UNITS = new String[] { "B", "KB", "MB", "GB", "TB", "PB", "???" };
+
+ public static String formatBytes( double val )
+ {
+ int unit = 0;
+ while ( Math.abs(val) > 1024 ) {
+ val /= 1024;
+ unit++;
+ }
+ if (unit >= UNITS.length) {
+ unit = UNITS.length - 1;
+ }
+ return String.format( "%.1f %s", val, UNITS[unit] );
+ }
+
}