001package org.apache.commons.jcs3.utils.threadpool;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Properties;
025import java.util.Set;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034
035import org.apache.commons.jcs3.log.Log;
036import org.apache.commons.jcs3.log.LogManager;
037import org.apache.commons.jcs3.utils.config.PropertySetter;
038
039/**
040 * This manages threadpools for an application
041 * <p>
042 * It is a singleton since threads need to be managed vm wide.
043 * </p>
044 * <p>
045 * This manager forces you to use a bounded queue. By default it uses the current thread for
046 * execution when the buffer is full and no free threads can be created.
047 * </p>
048 * <p>
049 * You can specify the props file to use or pass in a properties object prior to configuration.
050 * </p>
051 * <p>
052 * If set, the Properties object will take precedence.
053 * </p>
054 * <p>
055 * If a value is not set for a particular pool, the hard coded defaults in <code>PoolConfiguration</code> will be used.
056 * You can configure default settings by specifying <code>thread_pool.default</code> in the properties, ie "cache.ccf"
057 * </p>
058 */
059public class ThreadPoolManager
060{
061    /** The logger */
062    private static final Log log = LogManager.getLog( ThreadPoolManager.class );
063
064    /** The default config, created using property defaults if present, else those above. */
065    private PoolConfiguration defaultConfig;
066
067    /** The default scheduler config, created using property defaults if present, else those above. */
068    private PoolConfiguration defaultSchedulerConfig;
069
070    /** the root property name */
071    private static final String PROP_NAME_ROOT = "thread_pool";
072
073    /** default property file name */
074    private static final String DEFAULT_PROP_NAME_ROOT = "thread_pool.default";
075
076    /** the scheduler root property name */
077    private static final String PROP_NAME_SCHEDULER_ROOT = "scheduler_pool";
078
079    /** default scheduler property file name */
080    private static final String DEFAULT_PROP_NAME_SCHEDULER_ROOT = "scheduler_pool.default";
081
082   /**
083     * You can specify the properties to be used to configure the thread pool. Setting this post
084     * initialization will have no effect.
085     */
086    private static volatile Properties props;
087
088    /** Map of names to pools. */
089    private final ConcurrentHashMap<String, ExecutorService> pools;
090
091    /** Map of names to scheduler pools. */
092    private final ConcurrentHashMap<String, ScheduledExecutorService> schedulerPools;
093
094    /**
095     * The ThreadPoolManager instance (holder pattern)
096     */
097    private static class ThreadPoolManagerHolder
098    {
099        static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
100    }
101
102    /**
103     * No instances please. This is a singleton.
104     */
105    private ThreadPoolManager()
106    {
107        this.pools = new ConcurrentHashMap<>();
108        this.schedulerPools = new ConcurrentHashMap<>();
109        configure();
110    }
111
112    /**
113     * Creates a pool based on the configuration info.
114     * <p>
115     * @param config the pool configuration
116     * @param threadNamePrefix prefix for the thread names of the pool
117     * @return A ThreadPool wrapper
118     */
119    public ExecutorService createPool( final PoolConfiguration config, final String threadNamePrefix)
120    {
121        return createPool(config, threadNamePrefix, Thread.NORM_PRIORITY);
122    }
123
124    /**
125     * Creates a pool based on the configuration info.
126     * <p>
127     * @param config the pool configuration
128     * @param threadNamePrefix prefix for the thread names of the pool
129     * @param threadPriority the priority of the created threads
130     * @return A ThreadPool wrapper
131     */
132    public ExecutorService createPool( final PoolConfiguration config, final String threadNamePrefix, final int threadPriority )
133    {
134        BlockingQueue<Runnable> queue = null;
135        if ( config.isUseBoundary() )
136        {
137            log.debug( "Creating a Bounded Buffer to use for the pool" );
138            queue = new LinkedBlockingQueue<>(config.getBoundarySize());
139        }
140        else
141        {
142            log.debug( "Creating a non bounded Linked Queue to use for the pool" );
143            queue = new LinkedBlockingQueue<>();
144        }
145
146        final ThreadPoolExecutor pool = new ThreadPoolExecutor(
147            config.getStartUpSize(),
148            config.getMaximumPoolSize(),
149            config.getKeepAliveTime(),
150            TimeUnit.MILLISECONDS,
151            queue,
152            new DaemonThreadFactory(threadNamePrefix, threadPriority));
153
154        // when blocked policy
155        switch (config.getWhenBlockedPolicy())
156        {
157            case ABORT:
158                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
159                break;
160
161            case RUN:
162                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
163                break;
164
165            case WAIT:
166                throw new RuntimeException("POLICY_WAIT no longer supported");
167
168            case DISCARDOLDEST:
169                pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
170                break;
171
172            default:
173                break;
174        }
175
176        pool.prestartAllCoreThreads();
177
178        return pool;
179    }
180
181    /**
182     * Creates a scheduler pool based on the configuration info.
183     * <p>
184     * @param config the pool configuration
185     * @param threadNamePrefix prefix for the thread names of the pool
186     * @param threadPriority the priority of the created threads
187     * @return A ScheduledExecutorService
188     */
189    public ScheduledExecutorService createSchedulerPool( final PoolConfiguration config, final String threadNamePrefix, final int threadPriority )
190    {
191
192        return Executors.newScheduledThreadPool(
193                config.getMaximumPoolSize(),
194                new DaemonThreadFactory(threadNamePrefix, threadPriority));
195    }
196
197    /**
198     * Returns a configured instance of the ThreadPoolManger To specify a configuration file or
199     * Properties object to use call the appropriate setter prior to calling getInstance.
200     * <p>
201     * @return The single instance of the ThreadPoolManager
202     */
203    public static ThreadPoolManager getInstance()
204    {
205        return ThreadPoolManagerHolder.INSTANCE;
206    }
207
208    /**
209     * Dispose of the instance of the ThreadPoolManger and shut down all thread pools
210     */
211    public static void dispose()
212    {
213        for ( final Iterator<Map.Entry<String, ExecutorService>> i =
214                getInstance().pools.entrySet().iterator(); i.hasNext(); )
215        {
216            final Map.Entry<String, ExecutorService> entry = i.next();
217            try
218            {
219                entry.getValue().shutdownNow();
220            }
221            catch (final Throwable t)
222            {
223                log.warn("Failed to close pool {0}", entry.getKey(), t);
224            }
225            i.remove();
226        }
227
228        for ( final Iterator<Map.Entry<String, ScheduledExecutorService>> i =
229                getInstance().schedulerPools.entrySet().iterator(); i.hasNext(); )
230        {
231            final Map.Entry<String, ScheduledExecutorService> entry = i.next();
232            try
233            {
234                entry.getValue().shutdownNow();
235            }
236            catch (final Throwable t)
237            {
238                log.warn("Failed to close pool {0}", entry.getKey(), t);
239            }
240            i.remove();
241        }
242    }
243
244    /**
245     * Returns an executor service by name. If a service by this name does not exist in the configuration file or
246     * properties, one will be created using the default values.
247     * <p>
248     * Services are lazily created.
249     * <p>
250     * @param name
251     * @return The executor service configured for the name.
252     */
253    public ExecutorService getExecutorService( final String name )
254    {
255        return pools.computeIfAbsent(name, key -> {
256            log.debug( "Creating pool for name [{0}]", key );
257            final PoolConfiguration config = loadConfig( PROP_NAME_ROOT + "." + key, defaultConfig );
258            return createPool( config, "JCS-ThreadPoolManager-" + key + "-" );
259        });
260    }
261
262    /**
263     * Returns a scheduler pool by name. If a pool by this name does not exist in the configuration file or
264     * properties, one will be created using the default values.
265     * <p>
266     * Pools are lazily created.
267     * <p>
268     * @param name
269     * @return The scheduler pool configured for the name.
270     */
271    public ScheduledExecutorService getSchedulerPool( final String name )
272    {
273        return schedulerPools.computeIfAbsent(name, key -> {
274            log.debug( "Creating scheduler pool for name [{0}]", key );
275            final PoolConfiguration config = loadConfig( PROP_NAME_SCHEDULER_ROOT + "." + key,
276                    defaultSchedulerConfig );
277            return createSchedulerPool( config, "JCS-ThreadPoolManager-" + key + "-", Thread.NORM_PRIORITY );
278        });
279    }
280
281    /**
282     * Returns the names of all configured pools.
283     * <p>
284     * @return ArrayList of string names
285     */
286    protected Set<String> getPoolNames()
287    {
288        return pools.keySet();
289    }
290
291    /**
292     * This will be used if it is not null on initialization. Setting this post initialization will
293     * have no effect.
294     * <p>
295     * @param props The props to set.
296     */
297    public static void setProps( final Properties props )
298    {
299        ThreadPoolManager.props = props;
300    }
301
302    /**
303     * Initialize the ThreadPoolManager and create all the pools defined in the configuration.
304     */
305    private void configure()
306    {
307        log.debug( "Initializing ThreadPoolManager" );
308
309        if ( props == null )
310        {
311            log.warn( "No configuration settings found. Using hardcoded default values for all pools." );
312            props = new Properties();
313        }
314
315        // set initial default and then override if new settings are available
316        defaultConfig = loadConfig( DEFAULT_PROP_NAME_ROOT, new PoolConfiguration() );
317        defaultSchedulerConfig = loadConfig( DEFAULT_PROP_NAME_SCHEDULER_ROOT, new PoolConfiguration() );
318    }
319
320    /**
321     * Configures the PoolConfiguration settings.
322     * <p>
323     * @param root the configuration key prefix
324     * @param defaultPoolConfiguration the default configuration
325     * @return PoolConfiguration
326     */
327    private static PoolConfiguration loadConfig( final String root, final PoolConfiguration defaultPoolConfiguration )
328    {
329        final PoolConfiguration config = defaultPoolConfiguration.clone();
330        PropertySetter.setProperties( config, props, root + "." );
331
332        log.debug( "{0} PoolConfiguration = {1}", root, config );
333
334        return config;
335    }
336}