Jfinal引入Atomikos的后续完善(2)

3)事务运行过程中不可强制杀死线程,改写缩减SchedulerPlugin

package com.csnt.source.plugin.scheduledthread;

import com.jfinal.kit.Prop;
import com.jfinal.kit.PropKit;
import com.jfinal.kit.StrKit;
import com.jfinal.log.Log;
import com.jfinal.plugin.IPlugin;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * 只保留ScheduledThreadPoolExecutor的代码逻辑
 * @author source
 */
@SuppressWarnings("Duplicates")
public class ScheduledThreadPlugin implements IPlugin {
    private static Log LOG = Log.getLog("scheduler");


    /**
     * 调度线程池
     */
    private int scheduledThreadPoolSize = 10;

    /**
     * ScheduledThreadPoolExecutor调度器
     */
    private ScheduledThreadPoolExecutor fixedScheduler;


    /**
     * 调度任务配置文件
     */
    private final String jobConfigFile;


    /**
     * 是否有ScheduledThreadPoolExecutor任务
     */
    private boolean hasFixedJob = false;

    /**
     * <p>Title: SchedulerPlugin</p>
     * <p>Description: 构造函数(指定调度线程池大小、调度任务配置文件和扫描路径)</p>
     *
     * @param scheduledThreadPoolSize 调度线程池大小
     * @param jobConfigFile           调度任务配置文件
     * @since V1.0.0
     */
    private ScheduledThreadPlugin(int scheduledThreadPoolSize, String jobConfigFile) {
        this.scheduledThreadPoolSize = scheduledThreadPoolSize;
        this.jobConfigFile = jobConfigFile;
    }


    /**
     * @Title: ensurFixedScheduler
     * @Description: 确保fixedScheduler可用
     * @since V1.0.0
     */
    private void ensurFixedScheduler() {
        if (this.fixedScheduler == null) {
            synchronized (this) {
                if (this.fixedScheduler == null) {
                    this.fixedScheduler = new ScheduledThreadPoolExecutor(scheduledThreadPoolSize);
                }
            }
        }
    }

    /**
     * @param job                 定期执行的任务
     * @param initialDelaySeconds 启动延迟时间
     * @param periodSeconds       每次执行任务的间隔时间(单位秒)
     * @return
     * @Title: scheduleAtFixedRate
     * @Description: 延迟指定秒后启动,并以固定的频率来运行任务。后续任务的启动时间不受前次任务延时影响(并行)。
     * @since V1.0.0
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable job, int initialDelaySeconds, int periodSeconds) {
        ensurFixedScheduler();
        this.hasFixedJob = true;
        return fixedScheduler.scheduleAtFixedRate(job, initialDelaySeconds, periodSeconds, TimeUnit.SECONDS);
    }

    /**
     * @param job                 定期执行的任务
     * @param initialDelaySeconds 启动延迟时间
     * @param periodSeconds       每次执行任务的间隔时间(单位秒)
     * @return
     * @Title: scheduleWithFixedDelay
     * @Description: 延迟指定秒后启动,两次任务间保持固定的时间间隔(任务串行执行,前一个结束之后间隔固定时间后一个才会启动)
     * @since V1.0.0
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable job, int initialDelaySeconds, int periodSeconds) {
        ensurFixedScheduler();
        this.hasFixedJob = true;
        return fixedScheduler.scheduleWithFixedDelay(job, initialDelaySeconds, periodSeconds, TimeUnit.SECONDS);
    }

    @Override
    public boolean start() {
        //通过文件记载
        loadJobsFromConfigFile();
        //启动fixed任务
        if (this.hasFixedJob) {
            LOG.info("ScheduledThreadPoolExecutor已启动");
        }
        return true;
    }

    @Override
    public boolean stop() {
        //停止fixed任务
        if (this.hasFixedJob) {
//            //暴力强制停止
//            this.fixedScheduler.shutdownNow();
//            this.fixedScheduler = null;
            //优雅的停止
            this.fixedScheduler.shutdown();
            LOG.info("ScheduledThreadPoolExecutor开始停止");
            try {
                while (!fixedScheduler.awaitTermination(1, TimeUnit.SECONDS)) {
                    LOG.info("ScheduledThreadPlugin正在停止");
                }
            } catch (Throwable t) {
                LOG.error("ScheduledThreadPlugin停止失败");
                return false;
            }
        }
        LOG.info("ScheduledThreadPlugin已停止");
        return true;
    }

    /**
     * @Title: loadJobsFromConfigFile
     * @Description: 从配置文件汇总加载任务
     * @since V1.0.0
     */
    private void loadJobsFromConfigFile() {
        if (StrKit.isBlank(this.jobConfigFile)) {
            return;
        }
        // 获取job配置文件
        Prop jobProp = PropKit.use(this.jobConfigFile);
        // 获得所有任务名
        Set<String> jobNames = this.getJobNamesFromProp(jobProp);
        if (jobNames.isEmpty()) {
            return;
        }
        // 逐个加载任务
        for (String jobName : jobNames) {
            loadJob(jobProp, jobName);
        }
    }

    /**
     * @param jobProp job配置
     * @param jobName job名
     * @Title: loadJob
     * @Description: 加载一个任务
     * @since V1.0.0
     */
    private void loadJob(Prop jobProp, String jobName) {
        // 任务开关,默认开启
        Boolean enable = jobProp.getBoolean(jobName + ".enable", Boolean.FALSE);
        // 任务被禁用,直接返回
        if (!enable) {
            return;
        }

        // 创建要执行的任务
        Runnable runnable = createRunnableJob(jobName, jobProp.get(jobName + ".class"));
        int fixedRate = jobProp.getInt(jobName + ".fixedRate", 0);
        int fixedDelay = jobProp.getInt(jobName + ".fixedDelay", 0);
        //修改initialDelay默认值可不配置  避免比数据库启动更早
        int initialDelay = jobProp.getInt(jobName + ".initialDelay", 1);
        //参数检查
        int doubleCheckCounter = 0;
        if (fixedDelay != 0) {
            doubleCheckCounter++;
        }
        if (fixedRate != 0) {
            doubleCheckCounter++;
        }
        if (doubleCheckCounter != 1) {
            throw new RuntimeException(jobName + "的cron/fixedDelay/fixedRate需要且只能设定其中一个");
        }
        if (fixedDelay != 0) {
            this.scheduleAtFixedRate(runnable, initialDelay, fixedDelay);
            LOG.info("通过配置文件自动加载FixedRate类型定时任务( jobName=" + jobName + ", initialDelay=" + initialDelay + "'s, fixedDelay=" + fixedDelay + "'s )");
        } else {
            this.scheduleWithFixedDelay(runnable, initialDelay, fixedRate);
            LOG.info("通过配置文件自动加载FixedDelay类型定时任务( jobName=" + jobName + ", initialDelay=" + initialDelay + "'s, FixedDelay=" + fixedDelay + "'s )");
        }
    }

    /**
     * @param jobName      任务名
     * @param jobClassName 任务类名
     * @return Runnable对象
     * @Title: createRunnableJob
     * @Description: 创建任务
     * @since V1.0.0
     */
    private Runnable createRunnableJob(String jobName, String jobClassName) {
        if (jobClassName == null) {
            throw new RuntimeException("请设定 " + jobName + ".class");
        }

        Object temp = null;
        try {
            temp = Class.forName(jobClassName).newInstance();
        } catch (Exception e) {
            throw new RuntimeException("无法实例化类: " + jobClassName, e);
        }

        Runnable job = null;
        if (temp instanceof Runnable) {
            job = (Runnable) temp;
        } else {
            throw new RuntimeException("无法实例化类: " + jobClassName
                    + ",该类必须实现Runnable接口");
        }
        return job;
    }

    /**
     * @param jobProp job配置
     * @return 任务名集合
     * @Title: getJobNamesFromProp
     * @Description: 获得所有任务名
     * @since V1.0.0
     */
    private Set<String> getJobNamesFromProp(Prop jobProp) {
        Map<String, Boolean> jobNames = new HashMap<String, Boolean>(16);
        for (Object item : jobProp.getProperties().keySet()) {
            String fullKeyName = item.toString();
            // 获得job名
            String jobName = fullKeyName.substring(0, fullKeyName.indexOf("."));
            jobNames.put(jobName, Boolean.TRUE);
        }
        return jobNames.keySet();
    }


    /**
     * @return
     * @Title: builder
     * @Description: 返回一个构建器
     * @since V1.0.0
     */
    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        /**
         * 调度线程池大小
         */
        private int scheduledThreadPoolSize;

        /**
         * 调度任务配置文件
         */
        private String jobConfigFile = null;


        /**
         * <p>Title: Builder</p>
         * <p>Description: 默认构造函数</p>
         *
         * @since V1.0.0
         */
        public Builder() {
            this.scheduledThreadPoolSize = this.getBestPoolSize();
        }

        /**
         * @param size
         * @return
         * @Title: scheduledThreadPoolSize
         * @Description: 配置调度线程池大小
         * @since V1.0.0
         */
        public Builder scheduledThreadPoolSize(int size) {
            this.scheduledThreadPoolSize = size;
            return this;
        }

        /**
         * @param configFile
         * @return
         * @Title: enableConfigFile
         * @Description: 使能配置文件加载
         * @since V1.0.0
         */
        public Builder enableConfigFile(String configFile) {
            this.jobConfigFile = configFile;
            return this;
        }


        /**
         * @return
         * @Title: getBestPoolSize
         * @Description: 获得调度线程池大小
         * @since V1.0.0
         */
        private int getBestPoolSize() {
            try {
                final int cores = Runtime.getRuntime().availableProcessors();
                // 每个核有8个调度线程
                return cores * 8;
            } catch (Throwable e) {
                return 8;
            }
        }

        /**
         * @return
         * @Title: build
         * @Description: 构建一个调度器插件
         * @since V1.0.0
         */
        public ScheduledThreadPlugin build() {
            return new ScheduledThreadPlugin(this.scheduledThreadPoolSize, this.jobConfigFile);
        }
    }
}

4)使用winsw注册服务正常停止

<configuration>
     <id>jfinal</id>
     <name>jfinal Service()</name>
     <description>jfinal的服务</description>
     <executable>java</executable>
     <logmode>rotate</logmode>
     <logpath>%BASE%\logs</logpath>
     <env name="MAIN_CLASS" value="com.yourpackage.YourMainClass"/>
     <env name="JAVA_OPTS" value="-Xms256m -Xmx1024m"/>
     <env name="APP_BASE_PATH" value="%BASE%" />
     <env name="CP" value="%APP_BASE_PATH%\config;%APP_BASE_PATH%\lib\*" />
     <arguments>-Xverify:none %JAVA_OPTS% -cp %CP% %MAIN_CLASS%</arguments>
     <stopparentprocessfirst>true</stopparentprocessfirst>
     <stoptimeout>-1</stoptimeout>
 </configuration>

测试

package com.csnt.source.thread;
 public class TestThread implements Runnable {
     @Override
     public void run() {
         while (true) {
         }
     }
 }


查看是否一直打印ScheduledThreadPlugin正在停止

 

项目启动  连接缓存相关插件->数据库插件->线程池插件

项目关闭  线程池插件->数据库插件->连接缓存相关插件


 

参考资料:

SchedulerPlugin项目   (https://gitee.com/myaniu/jfinal-scheduler)

Winsw参考           (http://www.jfinal.com/share/1506)

补充:

  1. mysql分布式事务运行报错           数据库连接追加&pinGlobalTxToPhysicalConnection=true


评论区

热门分享

扫码入社