基于Jfinal的TaskExecutor,看有没有人用啊。

在前:之前使用SpringMVC,现在有项目使用Jfinal。

现在的项目对多线程这块使用的比较多,所以线程池就需要一些考虑。SpringMVC中有TaskExecutor,想着根据这个思路给Jfinal来个插件。因为之前没有写过插件,所以可能考虑的不周。我先本地测试了下,线程池的管理没有测出比较大的问题。

    之前@JFinal 有提过,一般的线程池在使用tomcat或者Jetty管理。


有需要的可以试试,新鲜出炉,没有特别的测试。


链接: https://pan.baidu.com/s/1eSx87p4 密码: zvvn


1.再AppConf.java中使用的configPlugin()


  1. TaskPlugin taskPlugin = new TaskPlugin();
  2. me.add(taskPlugin);


2.使用的话,这样使用:

  1. TaskKit.taskExecutor.execute(new Runnable() {
  2. @Override
  3. public void run() {
  4.  //println();
  5. }
  6. });

3.配置文件中配置:

  1. task.core_pool_size=200
  2. task.max_pool_size=10000
  3. task.queue_capacity=500
  4. task.keep_alive_seconds=30000

4.TaskPlugin的大体代码:

  1.  
  2. private boolean isStarted = false;
  3. private JfinalTaskExecutor jfinalTaskExecutor;
  4. @Override
  5. public boolean start() {
  6. if (isStarted)
  7. return true;
  8. jfinalTaskExecutor = new JfinalTaskExecutor();
  9. jfinalTaskExecutor.setCorePoolSize(PropKit.getInt("task.core_pool_size"));
  10. jfinalTaskExecutor.setKeepAliveSeconds(PropKit.getInt("task.keep_alive_seconds"));
  11. jfinalTaskExecutor.setQueueCapacity(PropKit.getInt("task.queue_capacity"));
  12. jfinalTaskExecutor.setMaxPoolSize( PropKit.getInt("task.max_pool_size"));
  13. jfinalTaskExecutor.afterPropertiesSet();
  14. TaskKit.setTaskExecutor(jfinalTaskExecutor);
  15. isStarted = true;
  16. return true;
  17. }
  18.  
  19. @Override
  20. public boolean stop() {
  21. jfinalTaskExecutor.shutdown();
  22. isStarted = false;
  23. return true;
  24. }


补充-----------------------

5.重要的JfinalTaskExecutor类

  1. /**
  2.  * @Title: TaskExecutor.java
  3.  * @Package com.jfinal.plugin
  4.  * @Description: TODO
  5.  * Copyright: Copyright (c) 2016 
  6.  * Company:QDUM
  7.  * 
  8.  * @author qdum-ivy
  9.  * @date 2016年9月13日 上午10:36:42
  10.  * @version V1.0
  11.  */
  12.  
  13. package com.jfinal.plugin.task.support;
  14.  
  15. import java.io.Serializable;
  16. import java.util.concurrent.BlockingQueue;
  17. import java.util.concurrent.Callable;
  18. import java.util.concurrent.Executor;
  19. import java.util.concurrent.ExecutorService;
  20. import java.util.concurrent.Future;
  21. import java.util.concurrent.LinkedBlockingQueue;
  22. import java.util.concurrent.RejectedExecutionException;
  23. import java.util.concurrent.RejectedExecutionHandler;
  24. import java.util.concurrent.SynchronousQueue;
  25. import java.util.concurrent.ThreadFactory;
  26. import java.util.concurrent.ThreadPoolExecutor;
  27. import java.util.concurrent.TimeUnit;
  28.  
  29. import javax.management.RuntimeErrorException;
  30.  
  31. import com.jfinal.plugin.IPlugin;
  32.  
  33.  
  34. /**
  35.   * @ClassName: TaskExecutor
  36.   * @Description: TODO
  37.   * @author qdum-ivy
  38.   * @date 2016年9月13日 上午10:36:42
  39.   *
  40.   */
  41.  
  42. public class JfinalTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor, Serializable {
  43.  
  44. private static final long serialVersionUID = 8691977770206223903L;
  45.  
  46. private final Object poolSizeMonitor = new Object();
  47.  
  48. private int corePoolSize = 1;
  49.  
  50. private int maxPoolSize = Integer.MAX_VALUE;
  51.  
  52. private int keepAliveSeconds = 60;
  53.  
  54. private boolean allowCoreThreadTimeOut = false;
  55.  
  56. private int queueCapacity = Integer.MAX_VALUE;
  57.  
  58. private ThreadPoolExecutor threadPoolExecutor;
  59.  
  60.  
  61. public void setCorePoolSize(int corePoolSize) {
  62. synchronized (this.poolSizeMonitor) {
  63. this.corePoolSize = corePoolSize;
  64. if (this.threadPoolExecutor != null) {
  65. this.threadPoolExecutor.setCorePoolSize(corePoolSize);
  66. }
  67. }
  68. }
  69.  
  70. public int getCorePoolSize() {
  71. synchronized (this.poolSizeMonitor) {
  72. return this.corePoolSize;
  73. }
  74. }
  75.  
  76. public void setMaxPoolSize(int maxPoolSize) {
  77. synchronized (this.poolSizeMonitor) {
  78. this.maxPoolSize = maxPoolSize;
  79. if (this.threadPoolExecutor != null) {
  80. this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
  81. }
  82. }
  83. }
  84.  
  85. public int getMaxPoolSize() {
  86. synchronized (this.poolSizeMonitor) {
  87. return this.maxPoolSize;
  88. }
  89. }
  90.  
  91. public void setKeepAliveSeconds(int keepAliveSeconds) {
  92. synchronized (this.poolSizeMonitor) {
  93. this.keepAliveSeconds = keepAliveSeconds;
  94. if (this.threadPoolExecutor != null) {
  95. this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
  96. }
  97. }
  98. }
  99.  
  100. public int getKeepAliveSeconds() {
  101. synchronized (this.poolSizeMonitor) {
  102. return this.keepAliveSeconds;
  103. }
  104. }
  105.  
  106. public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
  107. this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
  108. }
  109.  
  110. public void setQueueCapacity(int queueCapacity) {
  111. this.queueCapacity = queueCapacity;
  112. }
  113.  
  114.  
  115. protected ExecutorService initializeExecutor(
  116. ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
  117.  
  118. BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
  119. ThreadPoolExecutor executor  = new ThreadPoolExecutor(
  120. this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
  121. queue, threadFactory, rejectedExecutionHandler);
  122. if (this.allowCoreThreadTimeOut) {
  123. executor.allowCoreThreadTimeOut(true);
  124. }
  125.  
  126. this.threadPoolExecutor = executor;
  127. return executor;
  128. }
  129.  
  130. protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
  131. if (queueCapacity > 0) {
  132. return new LinkedBlockingQueue<Runnable>(queueCapacity);
  133. }
  134. else {
  135. return new SynchronousQueue<Runnable>();
  136. }
  137. }
  138.  
  139. public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
  140. if (threadPoolExecutor==null) {
  141. new RuntimeException("threadPoolExecutor == null");
  142. }
  143. return this.threadPoolExecutor;
  144. }
  145.  
  146. public int getPoolSize() {
  147. return getThreadPoolExecutor().getPoolSize();
  148. }
  149.  
  150. public int getActiveCount() {
  151. return getThreadPoolExecutor().getActiveCount();
  152. }
  153.  
  154.  
  155. public void execute(Runnable task) {
  156. Executor executor = getThreadPoolExecutor();
  157. try {
  158. executor.execute(task);
  159. }
  160. catch (RuntimeErrorException ex) {
  161. throw new RuntimeException("execute error "+ex);
  162. }
  163. }
  164.  
  165. public void execute(Runnable task, long startTimeout) {
  166. execute(task);
  167. }
  168.  
  169. public Future<?> submit(Runnable task) {
  170. ExecutorService executor = getThreadPoolExecutor();
  171. try {
  172. return executor.submit(task);
  173. }
  174. catch (RejectedExecutionException ex) {
  175. throw new RuntimeException("execute error "+ex);
  176. }
  177. }
  178.  
  179. public <T> Future<T> submit(Callable<T> task) {
  180. ExecutorService executor = getThreadPoolExecutor();
  181. try {
  182. return executor.submit(task);
  183. }
  184. catch (RejectedExecutionException ex) {
  185. throw new RuntimeException("execute error "+ex);
  186. }
  187. }
  188.  
  189. public boolean prefersShortLivedTasks() {
  190. return true;
  191. }
  192.  
  193. }

6.父类:ExecutorConfigurationSupport.java

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.RejectedExecutionHandler;
  3. import java.util.concurrent.ThreadFactory;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6.  
  7. import org.apache.commons.logging.Log;
  8. import org.apache.commons.logging.LogFactory;
  9.  
  10. @SuppressWarnings("serial")
  11. public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory{
  12.  
  13. protected final Log logger = LogFactory.getLog(getClass());
  14.  
  15. private ThreadFactory threadFactory = this;
  16.  
  17. private boolean threadNamePrefixSet = false;
  18.  
  19. private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
  20.  
  21. private boolean waitForTasksToCompleteOnShutdown = false;
  22.  
  23. private int awaitTerminationSeconds = 0;
  24.  
  25. private String beanName;
  26.  
  27. private ExecutorService executor;
  28.  
  29. public void setThreadFactory(ThreadFactory threadFactory) {
  30. this.threadFactory = (threadFactory != null ? threadFactory : this);
  31. }
  32.  
  33. @Override
  34. public void setThreadNamePrefix(String threadNamePrefix) {
  35. super.setThreadNamePrefix(threadNamePrefix);
  36. this.threadNamePrefixSet = true;
  37. }
  38. public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
  39. this.rejectedExecutionHandler =
  40. (rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
  41. }
  42. public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
  43. this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
  44. }
  45.  
  46. public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
  47. this.awaitTerminationSeconds = awaitTerminationSeconds;
  48. }
  49.  
  50. public void setBeanName(String name) {
  51. this.beanName = name;
  52. }
  53.  
  54.  
  55. /**
  56.  * Calls {@code initialize()} after the container applied all property values.
  57.  * @see #initialize()
  58.  */
  59. public void afterPropertiesSet() {
  60. initialize();
  61. }
  62.  
  63. /**
  64.  * Set up the ExecutorService.
  65.  */
  66. public void initialize() {
  67. if (logger.isInfoEnabled()) {
  68. logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
  69. }
  70. if (!this.threadNamePrefixSet && this.beanName != null) {
  71. setThreadNamePrefix(this.beanName + "-");
  72. }
  73. this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
  74. }
  75.  
  76. protected abstract ExecutorService initializeExecutor(
  77. ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
  78.  
  79.  
  80. public void destroy() {
  81. shutdown();
  82. }
  83.  
  84. public void shutdown() {
  85. if (logger.isInfoEnabled()) {
  86. logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
  87. }
  88. if (this.waitForTasksToCompleteOnShutdown) {
  89. this.executor.shutdown();
  90. }
  91. else {
  92. this.executor.shutdownNow();
  93. }
  94. awaitTerminationIfNecessary();
  95. }
  96. private void awaitTerminationIfNecessary() {
  97. if (this.awaitTerminationSeconds > 0) {
  98. try {
  99. if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {
  100. if (logger.isWarnEnabled()) {
  101. logger.warn("Timed out while waiting for executor" +
  102. (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
  103. }
  104. }
  105. }
  106. catch (InterruptedException ex) {
  107. if (logger.isWarnEnabled()) {
  108. logger.warn("Interrupted while waiting for executor" +
  109. (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
  110. }
  111. Thread.currentThread().interrupt();
  112. }
  113. }
  114. }
  115.  
  116. }



这两个类大体管理了生命周期。另外还有一些类四五个类代码太多,等找时间整理下一块打包发出来。

评论区

JFinal

2016-09-13 14:43

核心的 TaskKit 没分享出来哈,线程池的创建,生命周期的管理这部分是关键,没有发出来呢

IvyHelen

2016-09-13 14:58

@JFinal 好的。我是根据SpringMVC里面管理的,那我整理下都分享出来。可能牵扯到了好几个类。现在还精简。

IvyHelen

2016-09-13 15:10

这几个类的代码上传到百度云盘了:
链接: https://pan.baidu.com/s/1eSx87p4 密码: zvvn

JFinal

2016-09-13 15:11

@IvyHelen 贴子里面放上这个链接,有需要的朋友可以获取到,感谢支持

hotsmile

2016-12-20 09:29

在使用这个了,谢谢 @IvyHelen

hotsmile

2016-12-20 09:42

@IvyHelen ,我启动就报空指针了
@Override
public void addOtherPlugin(Plugins me) {
TaskPlugin taskPlugin=new TaskPlugin();
me.add(taskPlugin);
TaskKit.taskExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("----shdsdhsld");
}
});

// System.out.println("开始执行线程池插件");
}

hotsmile

2016-12-20 10:08

在Taskkit。taskExecuor这个就报错了

IvyHelen

2016-12-22 21:25

@hotsmile 麻烦上传头像,支持社区发展啊。嘿嘿

suruozhong

2018-08-23 14:05

非常好,顶一个,在用了

热门反馈

扫码入社