summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/util/CascadedThreadPoolExecutor.java
blob: 005f45f201469a668972c2136b8e5ae30def393f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package org.openslx.util;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Cascaded thread pool. A thread pool behaving mostly like a normal
 * TPE, until it is saturated and would reject execution. In that case,
 * it will try to run the job in a global, shared fallback thread pool,
 * and only reject execution if this also fails.
 * The reasoning is that you can define rather small thread pools for
 * different jobs, without having to use particularly high maximumPoolSize
 * for cases of sudden high load. If you have a dozen thread pools that can
 * grow to hundreds of threads, worst case you suddenly have a thousand
 * threads around using up memory and everything's messed up. Instead,
 * use conservative values like 8 or 16 as the maximum size, and rely on
 * the CascadedThreadPoolExecutor to take load spikes. So, even if multiple
 * parts of your application suddenly are hit with an unexpectedly high
 * load, the overall number of threads can be kept within reasonable bounds
 * and OOM situations are less likely to occur.
 * Also, in case some part of the application saturated the shared pool
 * for extended periods of time, other "well behaving" parts of your
 * application can still make progress with their small pools, in contrast
 * to a design where everything in your application shares one giant
 * thread pool directly.
 */
public class CascadedThreadPoolExecutor extends ThreadPoolExecutor
{

	private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {
		@Override
		public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
		{
			FALLBACK_TPE.execute( r );
		}
	};

	private static final ThreadPoolExecutor FALLBACK_TPE = new ThreadPoolExecutor( 16, 128,
			1, TimeUnit.MINUTES,
			new LinkedBlockingDeque<Runnable>( 4 ),
			new PrioThreadFactory( "FallbackTP" ),
			new AbortPolicy() );

	static {
		FALLBACK_TPE.allowCoreThreadTimeOut( true );
	}

	/**
	 * The RejectedExecutionHandler of this pool. We need to trigger this if the shared pool rejected
	 * the execution by throwing a RejectedExecutionException.
	 */
	private RejectedExecutionHandler customRejectionHandler = null;

	public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
			TimeUnit unit, BlockingQueue<Runnable> queue, String threadNamePrefix )
	{
		this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
				new PrioThreadFactory( threadNamePrefix ), null );
	}

	public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime,
			TimeUnit unit, int queueSize, String threadNamePrefix )
	{
		this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, null, threadNamePrefix );
	}

	public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			int queueSize, ThreadFactory threadFactory )
	{
		this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, threadFactory, null );
	}

	public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			int queueSize, RejectedExecutionHandler handler, String threadNamePrefix )
	{
		this( corePoolSize, maximumPoolSize, keepAliveTime, unit, queueSize, new PrioThreadFactory( threadNamePrefix ),
				handler );
	}

	public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler )
	{
		this( corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>( queueSize ),
				threadFactory, handler );
	}

	public CascadedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler handler )
	{
		// Only in super() call pass defaultHandler, not in this() calls!
		super( corePoolSize, maximumPoolSize, keepAliveTime, unit, queue,
				threadFactory, defaultHandler );
		this.customRejectionHandler = handler;
	}

	@Override
	public void execute( Runnable command )
	{
		try {
			super.execute( command );
		} catch ( RejectedExecutionException e ) {
			if ( customRejectionHandler == null || ( customRejectionHandler.getClass().equals( AbortPolicy.class ) ) ) {
				throw e;
			} else {
				customRejectionHandler.rejectedExecution( command, this );
			}
		}
	}

}