在前:之前使用SpringMVC,现在有项目使用Jfinal。
现在的项目对多线程这块使用的比较多,所以线程池就需要一些考虑。SpringMVC中有TaskExecutor,想着根据这个思路给Jfinal来个插件。因为之前没有写过插件,所以可能考虑的不周。我先本地测试了下,线程池的管理没有测出比较大的问题。
之前@JFinal 有提过,一般的线程池在使用tomcat或者Jetty管理。
有需要的可以试试,新鲜出炉,没有特别的测试。
链接: https://pan.baidu.com/s/1eSx87p4 密码: zvvn
1.再AppConf.java中使用的configPlugin()
TaskPlugin taskPlugin = new TaskPlugin(); me.add(taskPlugin);
2.使用的话,这样使用:
TaskKit.taskExecutor.execute(new Runnable() { @Override public void run() { //println(); } });
3.配置文件中配置:
task.core_pool_size=200 task.max_pool_size=10000 task.queue_capacity=500 task.keep_alive_seconds=30000
4.TaskPlugin的大体代码:
private boolean isStarted = false; private JfinalTaskExecutor jfinalTaskExecutor; @Override public boolean start() { if (isStarted) return true; jfinalTaskExecutor = new JfinalTaskExecutor(); jfinalTaskExecutor.setCorePoolSize(PropKit.getInt("task.core_pool_size")); jfinalTaskExecutor.setKeepAliveSeconds(PropKit.getInt("task.keep_alive_seconds")); jfinalTaskExecutor.setQueueCapacity(PropKit.getInt("task.queue_capacity")); jfinalTaskExecutor.setMaxPoolSize( PropKit.getInt("task.max_pool_size")); jfinalTaskExecutor.afterPropertiesSet(); TaskKit.setTaskExecutor(jfinalTaskExecutor); isStarted = true; return true; } @Override public boolean stop() { jfinalTaskExecutor.shutdown(); isStarted = false; return true; }
补充-----------------------
5.重要的JfinalTaskExecutor类
/** * @Title: TaskExecutor.java * @Package com.jfinal.plugin * @Description: TODO * Copyright: Copyright (c) 2016 * Company:QDUM * * @author qdum-ivy * @date 2016年9月13日 上午10:36:42 * @version V1.0 */ package com.jfinal.plugin.task.support; import java.io.Serializable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.RuntimeErrorException; import com.jfinal.plugin.IPlugin; /** * @ClassName: TaskExecutor * @Description: TODO * @author qdum-ivy * @date 2016年9月13日 上午10:36:42 * */ public class JfinalTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor, Serializable { private static final long serialVersionUID = 8691977770206223903L; private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = Integer.MAX_VALUE; private int keepAliveSeconds = 60; private boolean allowCoreThreadTimeOut = false; private int queueCapacity = Integer.MAX_VALUE; private ThreadPoolExecutor threadPoolExecutor; public void setCorePoolSize(int corePoolSize) { synchronized (this.poolSizeMonitor) { this.corePoolSize = corePoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setCorePoolSize(corePoolSize); } } } public int getCorePoolSize() { synchronized (this.poolSizeMonitor) { return this.corePoolSize; } } public void setMaxPoolSize(int maxPoolSize) { synchronized (this.poolSizeMonitor) { this.maxPoolSize = maxPoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); } } } public int getMaxPoolSize() { synchronized (this.poolSizeMonitor) { return this.maxPoolSize; } } public void setKeepAliveSeconds(int keepAliveSeconds) { synchronized (this.poolSizeMonitor) { this.keepAliveSeconds = keepAliveSeconds; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); } } } public int getKeepAliveSeconds() { synchronized (this.poolSizeMonitor) { return this.keepAliveSeconds; } } public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; } protected BlockingQueue<Runnable> createQueue(int queueCapacity) { if (queueCapacity > 0) { return new LinkedBlockingQueue<Runnable>(queueCapacity); } else { return new SynchronousQueue<Runnable>(); } } public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { if (threadPoolExecutor==null) { new RuntimeException("threadPoolExecutor == null"); } return this.threadPoolExecutor; } public int getPoolSize() { return getThreadPoolExecutor().getPoolSize(); } public int getActiveCount() { return getThreadPoolExecutor().getActiveCount(); } public void execute(Runnable task) { Executor executor = getThreadPoolExecutor(); try { executor.execute(task); } catch (RuntimeErrorException ex) { throw new RuntimeException("execute error "+ex); } } public void execute(Runnable task, long startTimeout) { execute(task); } public Future<?> submit(Runnable task) { ExecutorService executor = getThreadPoolExecutor(); try { return executor.submit(task); } catch (RejectedExecutionException ex) { throw new RuntimeException("execute error "+ex); } } public <T> Future<T> submit(Callable<T> task) { ExecutorService executor = getThreadPoolExecutor(); try { return executor.submit(task); } catch (RejectedExecutionException ex) { throw new RuntimeException("execute error "+ex); } } public boolean prefersShortLivedTasks() { return true; } }
6.父类:ExecutorConfigurationSupport.java
import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @SuppressWarnings("serial") public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory{ protected final Log logger = LogFactory.getLog(getClass()); private ThreadFactory threadFactory = this; private boolean threadNamePrefixSet = false; private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); private boolean waitForTasksToCompleteOnShutdown = false; private int awaitTerminationSeconds = 0; private String beanName; private ExecutorService executor; public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = (threadFactory != null ? threadFactory : this); } @Override public void setThreadNamePrefix(String threadNamePrefix) { super.setThreadNamePrefix(threadNamePrefix); this.threadNamePrefixSet = true; } public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) { this.rejectedExecutionHandler = (rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy()); } public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) { this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown; } public void setAwaitTerminationSeconds(int awaitTerminationSeconds) { this.awaitTerminationSeconds = awaitTerminationSeconds; } public void setBeanName(String name) { this.beanName = name; } /** * Calls {@code initialize()} after the container applied all property values. * @see #initialize() */ public void afterPropertiesSet() { initialize(); } /** * Set up the ExecutorService. */ public void initialize() { if (logger.isInfoEnabled()) { logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); } protected abstract ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler); public void destroy() { shutdown(); } public void shutdown() { if (logger.isInfoEnabled()) { logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (this.waitForTasksToCompleteOnShutdown) { this.executor.shutdown(); } else { this.executor.shutdownNow(); } awaitTerminationIfNecessary(); } private void awaitTerminationIfNecessary() { if (this.awaitTerminationSeconds > 0) { try { if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) { if (logger.isWarnEnabled()) { logger.warn("Timed out while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } } } catch (InterruptedException ex) { if (logger.isWarnEnabled()) { logger.warn("Interrupted while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } Thread.currentThread().interrupt(); } } } }
这两个类大体管理了生命周期。另外还有一些类四五个类代码太多,等找时间整理下一块打包发出来。