diff options
author | Simon Rettberg | 2024-05-22 16:49:20 +0200 |
---|---|---|
committer | Simon Rettberg | 2024-05-22 16:49:20 +0200 |
commit | 71a71f506fa6163bf30d4c70aba5d50f5f8cd9ef (patch) | |
tree | 2b0cb478dcbbe714c68bb89177899ff72e8e9804 | |
parent | [Util] Fix formatBytes unit overflow; make work for negative values (diff) | |
download | master-sync-shared-71a71f506fa6163bf30d4c70aba5d50f5f8cd9ef.tar.gz master-sync-shared-71a71f506fa6163bf30d4c70aba5d50f5f8cd9ef.tar.xz master-sync-shared-71a71f506fa6163bf30d4c70aba5d50f5f8cd9ef.zip |
Replace growing thread pool by cascaded one
Cascaded thread pools have a shared common fallback pool that will take
over when a specific thread pool instance is overloaded.
3 files changed, 116 insertions, 118 deletions
diff --git a/src/main/java/fi/iki/elonen/NanoHTTPD.java b/src/main/java/fi/iki/elonen/NanoHTTPD.java index 395eca7..0d21381 100644 --- a/src/main/java/fi/iki/elonen/NanoHTTPD.java +++ b/src/main/java/fi/iki/elonen/NanoHTTPD.java @@ -60,15 +60,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.output.ByteArrayOutputStream; -import org.openslx.util.GrowingThreadPoolExecutor; -import org.openslx.util.PrioThreadFactory; import org.openslx.util.Util; /** @@ -155,35 +151,6 @@ public abstract class NanoHTTPD implements Runnable protected int maxRequestSize = 0; /** - * Constructs an HTTP server on given port. - */ - public NanoHTTPD( int port ) throws IOException - { - this( null, port ); - } - - /** - * @param hostname Address to listen on - * @param port Port to listen on - */ - public NanoHTTPD( String hostname, int port ) throws IOException - { - this( hostname, port, 24, 16 ); - } - - /** - * @param hostname Address to listen on - * @param port Port to listen on - * @param maxThreads Maximum number of threads to spawn before we start queuing requests - * @param maxQueue Maximum number of requests we queue before we start rejecting them with 503 - */ - public NanoHTTPD( String hostname, int port, int maxThreads, int maxQueue ) throws IOException - { - this( hostname, port, new GrowingThreadPoolExecutor( 2, maxThreads, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>( maxQueue ), new PrioThreadFactory( "httpd" ) ) ); - } - - /** * Constructs an HTTP server on given hostname and port. */ public NanoHTTPD( String hostname, int port, ExecutorService executor ) throws IOException diff --git a/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java new file mode 100644 index 0000000..005f45f --- /dev/null +++ b/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java @@ -0,0 +1,116 @@ +package org.openslx.util; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Cascaded thread pool. A thread pool behaving mostly like a normal + * TPE, until it is saturated and would reject execution. In that case, + * it will try to run the job in a global, shared fallback thread pool, + * and only reject execution if this also fails. + * The reasoning is that you can define rather small thread pools for + * different jobs, without having to use particularly high maximumPoolSize + * for cases of sudden high load. If you have a dozen thread pools that can + * grow to hundreds of threads, worst case you suddenly have a thousand + * threads around using up memory and everything's messed up. Instead, + * use conservative values like 8 or 16 as the maximum size, and rely on + * the CascadedThreadPoolExecutor to take load spikes. So, even if multiple + * parts of your application suddenly are hit with an unexpectedly high + * load, the overall number of threads can be kept within reasonable bounds + * and OOM situations are less likely to occur. + * Also, in case some part of the application saturated the shared pool + * for extended periods of time, other "well behaving" parts of your + * application can still make progress with their small pools, in contrast + * to a design where everything in your application shares one giant + * thread pool directly. + */ +public class CascadedThreadPoolExecutor extends ThreadPoolExecutor +{ + + private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() { + @Override + public void rejectedExecution( Runnable r, ThreadPoolExecutor executor ) + { + FALLBACK_TPE.execute( r ); + } + }; + + private static final ThreadPoolExecutor FALLBACK_TPE = new ThreadPoolExecutor( 16, 128, + 1, TimeUnit.MINUTES, + new LinkedBlockingDeque<Runnable>( 4 ), + new PrioThreadFactory( "FallbackTP" ), + new AbortPolicy() ); + + static { + FALLBACK_TPE.allowCoreThreadTimeOut( true ); + } + + /** + * The RejectedExecutionHandler of this pool. We need to trigger this if the shared pool rejected + * the execution by throwing a RejectedExecutionException. + */ + private RejectedExecutionHandler customRejectionHandler = null; + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> queue, String threadNamePrefix ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, + new PrioThreadFactory( threadNamePrefix ), null ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, int queueSize, String threadNamePrefix ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, null, threadNamePrefix ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + int queueSize, ThreadFactory threadFactory ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, threadFactory, null ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + int queueSize, RejectedExecutionHandler handler, String threadNamePrefix ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, new PrioThreadFactory( threadNamePrefix ), + handler ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler ) + { + this( corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>( queueSize ), + threadFactory, handler ); + } + + public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler handler ) + { + // Only in super() call pass defaultHandler, not in this() calls! + super( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, + threadFactory, defaultHandler ); + this.customRejectionHandler = handler; + } + + @Override + public void execute( Runnable command ) + { + try { + super.execute( command ); + } catch ( RejectedExecutionException e ) { + if ( customRejectionHandler == null || ( customRejectionHandler.getClass().equals( AbortPolicy.class ) ) ) { + throw e; + } else { + customRejectionHandler.rejectedExecution( command, this ); + } + } + } + +} diff --git a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java b/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java deleted file mode 100644 index 7b0b2d9..0000000 --- a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.openslx.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * Grows to maximum pool size before queueing. See - * http://stackoverflow.com/a/20153234/2043481 - */ -public class GrowingThreadPoolExecutor extends ThreadPoolExecutor { - private int userSpecifiedCorePoolSize; - private int taskCount; - - /** - * The default rejected execution handler - */ - private static final RejectedExecutionHandler defaultHandler = - new AbortPolicy(); - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, BlockingQueue<Runnable> workQueue) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, defaultHandler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), - handler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - userSpecifiedCorePoolSize = corePoolSize; - } - - @Override - public void execute(Runnable runnable) { - synchronized (this) { - taskCount++; - setCorePoolSizeToTaskCountWithinBounds(); - } - super.execute(runnable); - } - - @Override - protected void afterExecute(Runnable runnable, Throwable throwable) { - super.afterExecute(runnable, throwable); - synchronized (this) { - taskCount--; - setCorePoolSizeToTaskCountWithinBounds(); - } - } - - private void setCorePoolSizeToTaskCountWithinBounds() { - int threads = taskCount; - if (threads < userSpecifiedCorePoolSize) - threads = userSpecifiedCorePoolSize; - if (threads > getMaximumPoolSize()) - threads = getMaximumPoolSize(); - super.setCorePoolSize(threads); - } - - public void setCorePoolSize(int corePoolSize) { - synchronized (this) { - userSpecifiedCorePoolSize = corePoolSize; - } - } - - @Override - public int getCorePoolSize() { - synchronized (this) { - return userSpecifiedCorePoolSize; - } - } -}
\ No newline at end of file |