summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/fi/iki/elonen/NanoHTTPD.java33
-rw-r--r--src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java116
-rw-r--r--src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java85
3 files changed, 116 insertions, 118 deletions
diff --git a/src/main/java/fi/iki/elonen/NanoHTTPD.java b/src/main/java/fi/iki/elonen/NanoHTTPD.java
index 395eca7..0d21381 100644
--- a/src/main/java/fi/iki/elonen/NanoHTTPD.java
+++ b/src/main/java/fi/iki/elonen/NanoHTTPD.java
@@ -60,15 +60,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.openslx.util.GrowingThreadPoolExecutor;
-import org.openslx.util.PrioThreadFactory;
import org.openslx.util.Util;
/**
@@ -155,35 +151,6 @@ public abstract class NanoHTTPD implements Runnable
protected int maxRequestSize = 0;
/**
- * Constructs an HTTP server on given port.
- */
- public NanoHTTPD( int port ) throws IOException
- {
- this( null, port );
- }
-
- /**
- * @param hostname Address to listen on
- * @param port Port to listen on
- */
- public NanoHTTPD( String hostname, int port ) throws IOException
- {
- this( hostname, port, 24, 16 );
- }
-
- /**
- * @param hostname Address to listen on
- * @param port Port to listen on
- * @param maxThreads Maximum number of threads to spawn before we start queuing requests
- * @param maxQueue Maximum number of requests we queue before we start rejecting them with 503
- */
- public NanoHTTPD( String hostname, int port, int maxThreads, int maxQueue ) throws IOException
- {
- this( hostname, port, new GrowingThreadPoolExecutor( 2, maxThreads, 1, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>( maxQueue ), new PrioThreadFactory( "httpd" ) ) );
- }
-
- /**
* Constructs an HTTP server on given hostname and port.
*/
public NanoHTTPD( String hostname, int port, ExecutorService executor ) throws IOException
diff --git a/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java
new file mode 100644
index 0000000..005f45f
--- /dev/null
+++ b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java
@@ -0,0 +1,116 @@
+package org.openslx.util;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cascaded thread pool. A thread pool behaving mostly like a normal
+ * TPE, until it is saturated and would reject execution. In that case,
+ * it will try to run the job in a global, shared fallback thread pool,
+ * and only reject execution if this also fails.
+ * The reasoning is that you can define rather small thread pools for
+ * different jobs, without having to use particularly high maximumPoolSize
+ * for cases of sudden high load. If you have a dozen thread pools that can
+ * grow to hundreds of threads, worst case you suddenly have a thousand
+ * threads around using up memory and everything's messed up. Instead,
+ * use conservative values like 8 or 16 as the maximum size, and rely on
+ * the CascadedThreadPoolExecutor to take load spikes. So, even if multiple
+ * parts of your application suddenly are hit with an unexpectedly high
+ * load, the overall number of threads can be kept within reasonable bounds
+ * and OOM situations are less likely to occur.
+ * Also, in case some part of the application saturated the shared pool
+ * for extended periods of time, other "well behaving" parts of your
+ * application can still make progress with their small pools, in contrast
+ * to a design where everything in your application shares one giant
+ * thread pool directly.
+ */
+public class CascadedThreadPoolExecutor extends ThreadPoolExecutor
+{
+
+ private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
+ {
+ FALLBACK_TPE.execute( r );
+ }
+ };
+
+ private static final ThreadPoolExecutor FALLBACK_TPE = new ThreadPoolExecutor( 16, 128,
+ 1, TimeUnit.MINUTES,
+ new LinkedBlockingDeque<Runnable>( 4 ),
+ new PrioThreadFactory( "FallbackTP" ),
+ new AbortPolicy() );
+
+ static {
+ FALLBACK_TPE.allowCoreThreadTimeOut( true );
+ }
+
+ /**
+ * The RejectedExecutionHandler of this pool. We need to trigger this if the shared pool rejected
+ * the execution by throwing a RejectedExecutionException.
+ */
+ private RejectedExecutionHandler customRejectionHandler = null;
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
+ TimeUnit unit, BlockingQueue<Runnable> queue, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
+ new PrioThreadFactory( threadNamePrefix ), null );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
+ TimeUnit unit, int queueSize, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, null, threadNamePrefix );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, ThreadFactory threadFactory )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, threadFactory, null );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, RejectedExecutionHandler handler, String threadNamePrefix )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, new PrioThreadFactory( threadNamePrefix ),
+ handler );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler )
+ {
+ this( corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>( queueSize ),
+ threadFactory, handler );
+ }
+
+ public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler handler )
+ {
+ // Only in super() call pass defaultHandler, not in this() calls!
+ super( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
+ threadFactory, defaultHandler );
+ this.customRejectionHandler = handler;
+ }
+
+ @Override
+ public void execute( Runnable command )
+ {
+ try {
+ super.execute( command );
+ } catch ( RejectedExecutionException e ) {
+ if ( customRejectionHandler == null || ( customRejectionHandler.getClass().equals( AbortPolicy.class ) ) ) {
+ throw e;
+ } else {
+ customRejectionHandler.rejectedExecution( command, this );
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java b/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java
deleted file mode 100644
index 7b0b2d9..0000000
--- a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.openslx.util;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Grows to maximum pool size before queueing. See
- * http://stackoverflow.com/a/20153234/2043481
- */
-public class GrowingThreadPoolExecutor extends ThreadPoolExecutor {
- private int userSpecifiedCorePoolSize;
- private int taskCount;
-
- /**
- * The default rejected execution handler
- */
- private static final RejectedExecutionHandler defaultHandler =
- new AbortPolicy();
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
- TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, defaultHandler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(),
- handler);
- }
-
- public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- userSpecifiedCorePoolSize = corePoolSize;
- }
-
- @Override
- public void execute(Runnable runnable) {
- synchronized (this) {
- taskCount++;
- setCorePoolSizeToTaskCountWithinBounds();
- }
- super.execute(runnable);
- }
-
- @Override
- protected void afterExecute(Runnable runnable, Throwable throwable) {
- super.afterExecute(runnable, throwable);
- synchronized (this) {
- taskCount--;
- setCorePoolSizeToTaskCountWithinBounds();
- }
- }
-
- private void setCorePoolSizeToTaskCountWithinBounds() {
- int threads = taskCount;
- if (threads < userSpecifiedCorePoolSize)
- threads = userSpecifiedCorePoolSize;
- if (threads > getMaximumPoolSize())
- threads = getMaximumPoolSize();
- super.setCorePoolSize(threads);
- }
-
- public void setCorePoolSize(int corePoolSize) {
- synchronized (this) {
- userSpecifiedCorePoolSize = corePoolSize;
- }
- }
-
- @Override
- public int getCorePoolSize() {
- synchronized (this) {
- return userSpecifiedCorePoolSize;
- }
- }
-} \ No newline at end of file