在前:之前使用SpringMVC,现在有项目使用Jfinal。
现在的项目对多线程这块使用的比较多,所以线程池就需要一些考虑。SpringMVC中有TaskExecutor,想着根据这个思路给Jfinal来个插件。因为之前没有写过插件,所以可能考虑的不周。我先本地测试了下,线程池的管理没有测出比较大的问题。
之前@JFinal 有提过,一般的线程池在使用tomcat或者Jetty管理。
有需要的可以试试,新鲜出炉,没有特别的测试。
链接: https://pan.baidu.com/s/1eSx87p4 密码: zvvn
1.再AppConf.java中使用的configPlugin()
TaskPlugin taskPlugin = new TaskPlugin(); me.add(taskPlugin);
2.使用的话,这样使用:
TaskKit.taskExecutor.execute(new Runnable() {
@Override
public void run() {
//println();
}
});3.配置文件中配置:
task.core_pool_size=200 task.max_pool_size=10000 task.queue_capacity=500 task.keep_alive_seconds=30000
4.TaskPlugin的大体代码:
private boolean isStarted = false;
private JfinalTaskExecutor jfinalTaskExecutor;
@Override
public boolean start() {
if (isStarted)
return true;
jfinalTaskExecutor = new JfinalTaskExecutor();
jfinalTaskExecutor.setCorePoolSize(PropKit.getInt("task.core_pool_size"));
jfinalTaskExecutor.setKeepAliveSeconds(PropKit.getInt("task.keep_alive_seconds"));
jfinalTaskExecutor.setQueueCapacity(PropKit.getInt("task.queue_capacity"));
jfinalTaskExecutor.setMaxPoolSize( PropKit.getInt("task.max_pool_size"));
jfinalTaskExecutor.afterPropertiesSet();
TaskKit.setTaskExecutor(jfinalTaskExecutor);
isStarted = true;
return true;
}
@Override
public boolean stop() {
jfinalTaskExecutor.shutdown();
isStarted = false;
return true;
}补充-----------------------
5.重要的JfinalTaskExecutor类
/**
* @Title: TaskExecutor.java
* @Package com.jfinal.plugin
* @Description: TODO
* Copyright: Copyright (c) 2016
* Company:QDUM
*
* @author qdum-ivy
* @date 2016年9月13日 上午10:36:42
* @version V1.0
*/
package com.jfinal.plugin.task.support;
import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.RuntimeErrorException;
import com.jfinal.plugin.IPlugin;
/**
* @ClassName: TaskExecutor
* @Description: TODO
* @author qdum-ivy
* @date 2016年9月13日 上午10:36:42
*
*/
public class JfinalTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor, Serializable {
private static final long serialVersionUID = 8691977770206223903L;
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private boolean allowCoreThreadTimeOut = false;
private int queueCapacity = Integer.MAX_VALUE;
private ThreadPoolExecutor threadPoolExecutor;
public void setCorePoolSize(int corePoolSize) {
synchronized (this.poolSizeMonitor) {
this.corePoolSize = corePoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setCorePoolSize(corePoolSize);
}
}
}
public int getCorePoolSize() {
synchronized (this.poolSizeMonitor) {
return this.corePoolSize;
}
}
public void setMaxPoolSize(int maxPoolSize) {
synchronized (this.poolSizeMonitor) {
this.maxPoolSize = maxPoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
}
}
}
public int getMaxPoolSize() {
synchronized (this.poolSizeMonitor) {
return this.maxPoolSize;
}
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
synchronized (this.poolSizeMonitor) {
this.keepAliveSeconds = keepAliveSeconds;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
}
}
}
public int getKeepAliveSeconds() {
synchronized (this.poolSizeMonitor) {
return this.keepAliveSeconds;
}
}
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<Runnable>(queueCapacity);
}
else {
return new SynchronousQueue<Runnable>();
}
}
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
if (threadPoolExecutor==null) {
new RuntimeException("threadPoolExecutor == null");
}
return this.threadPoolExecutor;
}
public int getPoolSize() {
return getThreadPoolExecutor().getPoolSize();
}
public int getActiveCount() {
return getThreadPoolExecutor().getActiveCount();
}
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RuntimeErrorException ex) {
throw new RuntimeException("execute error "+ex);
}
}
public void execute(Runnable task, long startTimeout) {
execute(task);
}
public Future<?> submit(Runnable task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new RuntimeException("execute error "+ex);
}
}
public <T> Future<T> submit(Callable<T> task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new RuntimeException("execute error "+ex);
}
}
public boolean prefersShortLivedTasks() {
return true;
}
}6.父类:ExecutorConfigurationSupport.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@SuppressWarnings("serial")
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory{
protected final Log logger = LogFactory.getLog(getClass());
private ThreadFactory threadFactory = this;
private boolean threadNamePrefixSet = false;
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
private boolean waitForTasksToCompleteOnShutdown = false;
private int awaitTerminationSeconds = 0;
private String beanName;
private ExecutorService executor;
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = (threadFactory != null ? threadFactory : this);
}
@Override
public void setThreadNamePrefix(String threadNamePrefix) {
super.setThreadNamePrefix(threadNamePrefix);
this.threadNamePrefixSet = true;
}
public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler =
(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
}
public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
}
public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
this.awaitTerminationSeconds = awaitTerminationSeconds;
}
public void setBeanName(String name) {
this.beanName = name;
}
/**
* Calls {@code initialize()} after the container applied all property values.
* @see #initialize()
*/
public void afterPropertiesSet() {
initialize();
}
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
protected abstract ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
public void destroy() {
shutdown();
}
public void shutdown() {
if (logger.isInfoEnabled()) {
logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
this.executor.shutdownNow();
}
awaitTerminationIfNecessary();
}
private void awaitTerminationIfNecessary() {
if (this.awaitTerminationSeconds > 0) {
try {
if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {
if (logger.isWarnEnabled()) {
logger.warn("Timed out while waiting for executor" +
(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
}
}
}
catch (InterruptedException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Interrupted while waiting for executor" +
(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
}
Thread.currentThread().interrupt();
}
}
}
}这两个类大体管理了生命周期。另外还有一些类四五个类代码太多,等找时间整理下一块打包发出来。