summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/util')
-rw-r--r--src/main/java/org/openslx/util/AppUtil.java4
-rw-r--r--src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java116
-rw-r--r--src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java85
-rw-r--r--src/main/java/org/openslx/util/Util.java9
4 files changed, 124 insertions, 90 deletions
diff --git a/src/main/java/org/openslx/util/AppUtil.java b/src/main/java/org/openslx/util/AppUtil.java
index 340dc21..9a1e039 100644
--- a/src/main/java/org/openslx/util/AppUtil.java
+++ b/src/main/java/org/openslx/util/AppUtil.java
@@ -59,7 +59,9 @@ public class AppUtil
jarFileStream = AppUtil.class.getProtectionDomain().getCodeSource().getLocation().openStream();
jarStream = new JarInputStream( jarFileStream );
final Manifest mf = jarStream.getManifest();
- manifestAttributes = mf.getMainAttributes();
+ if ( mf != null ) {
+ manifestAttributes = mf.getMainAttributes();
+ }
} catch ( Exception e ) {
LOGGER.warn( "Cannot read jar/manifest attributes", e );
} finally {
diff --git a/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java
new file mode 100644
index 0000000..005f45f
--- /dev/null
+++ b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java
@@ -0,0 +1,116 @@
+package org.openslx.util;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cascaded thread pool. A thread pool behaving mostly like a normal
+ * TPE, until it is saturated and would reject execution. In that case,
+ * it will try to run the job in a global, shared fallback thread pool,
+ * and only reject execution if this also fails.
+ * The reasoning is that you can define rather small thread pools for
+ * different jobs, without having to use particularly high maximumPoolSize
+ * for cases of sudden high load. If you have a dozen thread pools that can
+ * grow to hundreds of threads, worst case you suddenly have a thousand
+ * threads around using up memory and everything's messed up. Instead,
+ * use conservative values like 8 or 16 as the maximum size, and rely on
+ * the CascadedThreadPoolExecutor to take load spikes. So, even if multiple
+ * parts of your application suddenly are hit with an unexpectedly high
+ * load, the overall number of threads can be kept within reasonable bounds
+ * and OOM situations are less likely to occur.
+ * Also, in case some part of the application saturated the shared pool
+ * for extended periods of time, other "well behaving" parts of your
+ * application can still make progress with their small pools, in contrast
+ * to a design where everything in your application shares one giant
+ * thread pool directly.
+ */
+public class CascadedThreadPoolExecutor extends ThreadPoolExecutor
+{
+
+ private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
+ {
+ FALLBACK_TPE.execute( r );
+ }
+ };
+
+ private static final ThreadPoolExecutor FALLBACK_TPE = new ThreadPoolExecutor( 16, 128,
+ 1, TimeUnit.MINUTES,
+ new LinkedBlockingDeque<Runnable>( 4 ),
+ new PrioThreadFactory( "FallbackTP" ),
+ new AbortPolicy() );
+
+ static {
+ FALLBACK_TPE.allowCoreThreadTimeOut( true );
+ }
+
+ /**
+ * The RejectedExecutionHandler of this pool. We need to trigger this if the shared pool rejected
+ * the execution by throwing a RejectedExecutionException.
+ */
+ private RejectedExecutionHandler customRejectionHandler = null;
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
+ TimeUnit unit, BlockingQueue<Runnable> queue, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
+ new PrioThreadFactory( threadNamePrefix ), null );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
+ TimeUnit unit, int queueSize, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, null, threadNamePrefix );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, ThreadFactory threadFactory )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, threadFactory, null );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, RejectedExecutionHandler handler, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, new PrioThreadFactory( threadNamePrefix ),
+ handler );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>( queueSize ),
+ threadFactory, handler );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler handler )
+ {
+ // Only in super() call pass defaultHandler, not in this() calls!
+ super( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
+ threadFactory, defaultHandler );
+ this.customRejectionHandler = handler;
+ }
+
+ @Override
+ public void execute( Runnable command )
+ {
+ try {
+ super.execute( command );
+ } catch ( RejectedExecutionException e ) {
+ if ( customRejectionHandler == null || ( customRejectionHandler.getClass().equals( AbortPolicy.class ) ) ) {
+ throw e;
+ } else {
+ customRejectionHandler.rejectedExecution( command, this );
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java b/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java
deleted file mode 100644
index 7b0b2d9..0000000
--- a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.openslx.util;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Grows to maximum pool size before queueing. See
- * http://stackoverflow.com/a/20153234/2043481
- */
-public class GrowingThreadPoolExecutor extends ThreadPoolExecutor {
- private int userSpecifiedCorePoolSize;
- private int taskCount;
-
- /**
- * The default rejected execution handler
- */
- private static final RejectedExecutionHandler defaultHandler =
- new AbortPolicy();
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
- TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, defaultHandler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(),
- handler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- userSpecifiedCorePoolSize = corePoolSize;
- }
-
- @Override
- public void execute(Runnable runnable) {
- synchronized (this) {
- taskCount++;
- setCorePoolSizeToTaskCountWithinBounds();
- }
- super.execute(runnable);
- }
-
- @Override
- protected void afterExecute(Runnable runnable, Throwable throwable) {
- super.afterExecute(runnable, throwable);
- synchronized (this) {
- taskCount--;
- setCorePoolSizeToTaskCountWithinBounds();
- }
- }
-
- private void setCorePoolSizeToTaskCountWithinBounds() {
- int threads = taskCount;
- if (threads < userSpecifiedCorePoolSize)
- threads = userSpecifiedCorePoolSize;
- if (threads > getMaximumPoolSize())
- threads = getMaximumPoolSize();
- super.setCorePoolSize(threads);
- }
-
- public void setCorePoolSize(int corePoolSize) {
- synchronized (this) {
- userSpecifiedCorePoolSize = corePoolSize;
- }
- }
-
- @Override
- public int getCorePoolSize() {
- synchronized (this) {
- return userSpecifiedCorePoolSize;
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/org/openslx/util/Util.java b/src/main/java/org/openslx/util/Util.java
index 5300cfa..e425083 100644
--- a/src/main/java/org/openslx/util/Util.java
+++ b/src/main/java/org/openslx/util/Util.java
@@ -147,7 +147,7 @@ public class Util
*/
public static long tickCount()
{
- return System.nanoTime() / 1000;
+ return System.nanoTime() / 1000_000;
}
private static final String[] UNITS = new String[] { "B", "KB", "MB", "GB", "TB", "PB", "???" };
@@ -155,11 +155,12 @@ public class Util
public static String formatBytes( double val )
{
int unit = 0;
- while ( val > 1024 ) {
+ while ( Math.abs(val) > 1024 ) {
val /= 1024;
unit++;
- if (unit >= UNITS.length)
- break;
+ }
+ if (unit >= UNITS.length) {
+ unit = UNITS.length - 1;
}
return String.format( "%.1f %s", val, UNITS[unit] );
}