JFinal使用技巧-ScheduledKit的使用

继上篇ThreadPoolKit分享的一次性任务,用完即消~ 
相反ScheduledKit这个工具类是执行重复性任务!与Cron4jPlugin类似,但又不太一样,无依赖、轻量级、玩法多 的特点。
话不多说!上 石马 ~

/**
 * 调度工具类
 * <pre>
 * 1:scheduleWithFixedDelay 以上一次任务的 "结束时间" 为间隔调度任务
 * 2:scheduleAtFixedRate    以上一次任务的 "开始时间" 为间隔调度任务。当本次调度来临时,如果上一次任务未执行完,则等待它执行完成后再立即调度
 * 3:警告:必须要在被调度的任务(Runnable/Callable)中捕获异常,否则调度将会停止
 * </pre>
 */
public class ScheduledKit {

    private static ScheduledExecutorService executor = null;

    private ScheduledKit() {}

    /**
     * 初始化
     *
     * @param corePoolSize the number of threads to keep in the pool, even if they are idle
     */
    public synchronized static void init(int corePoolSize) {
       if (executor == null) {
          executor = Executors.newScheduledThreadPool(corePoolSize);
       } else {
          Log.getLog(ScheduledKit.class).warn(ScheduledKit.class.getName() + " 已经初始化");
       }
    }

    /**
     * 传递 ScheduledExecutorService 对象进行初始化,从而完全掌控线程池参数
     */
    public synchronized static void init(ScheduledExecutorService executor) {
       if (ScheduledKit.executor == null) {
          ScheduledKit.executor = executor;
       } else {
          Log.getLog(ScheduledKit.class).warn(ScheduledKit.class.getName() + " 已经初始化");
       }
    }

    public static ScheduledExecutorService getExecutor() {
       if (executor == null) {
          init(5);
       }
       return executor;
    }

    /**
     * 以固定延迟执行任务
     * @param initialDelay 第一次启动前的延迟
     * @param delay 上次任务 "完成" 时间与本次任务 "开始" 时间的间隔
     * @param unit 时间单位
     * @param task 被执行的任务
     */
    public static ScheduledFuture<?> scheduleWithFixedDelay(long initialDelay, long delay, TimeUnit unit, Runnable task) {
       return getExecutor().scheduleWithFixedDelay(task, initialDelay, delay, unit);
    }

    /**
     * 任务添加 try catch ,避免 scheduleWithFixedDelay 方法在调度任务出现异常后会终止调度
     */
    public static ScheduledFuture<?> scheduleWithFixedDelayWithTryCatch(long initialDelay, long delay, TimeUnit unit, Runnable task) {
       return scheduleWithFixedDelay(initialDelay, delay, unit, () -> {
          try {
             task.run();
          } catch (Throwable t) {
             Log.getLog(ScheduledKit.class).error(t.getMessage(), t);
          }
       });
    }

    /**
     * 以固定频率执行任务
     * @param initialDelay 第一次启动前的延迟
     * @param period 上次任务 "开始" 时间与本次任务 "开始" 时间的间隔,如果任务执行时长超出 period 值,则在任务执行完成后立即调度任务执行
     * @param unit 时间单位
     * @param task 被执行的任务
     */
    public static ScheduledFuture<?> scheduleAtFixedRate(long initialDelay, long period, TimeUnit unit, Runnable task) {
       return getExecutor().scheduleAtFixedRate(task, initialDelay, period, unit);
    }

    /**
     * 任务添加 try catch ,避免 scheduleAtFixedRate 方法在调度任务出现异常后会终止调度
     */
    public static ScheduledFuture<?> scheduleAtFixedRateWithTryCatch(long initialDelay, long period, TimeUnit unit, Runnable task) {
       return scheduleAtFixedRate(initialDelay, period, unit, () -> {
          try {
             task.run();
          } catch (Throwable t) {
             Log.getLog(ScheduledKit.class).error(t.getMessage(), t);
          }
       });
    }

    /**
     * 创建一次性调度,在给定的 delay 时间后调度
     * @param delay 从现在开始的延迟时间
     * @param unit 时间单位
     * @param task 被执行任务
     */
    public static ScheduledFuture<?> schedule(long delay, TimeUnit unit, Runnable task) {
       return getExecutor().schedule(task, delay, unit);
    }

    /**
     * 创建一次性调度,在给定的 delay 时间后调度
     * @param delay 从现在开始的延迟时间
     * @param unit 时间单位
     * @param task 被执行任务
     */
    public static <V> ScheduledFuture<V> schedule(long delay, TimeUnit unit, Callable<V> task) {
       return getExecutor().schedule(task, delay, unit);
    }

    /**
     * 等待正在执行的线程执行完毕以后,关闭线程池
     */
    public static void shutdown() {
       if (executor != null) {
          executor.shutdown();
       }
    }

    /**
     * 停掉正在执行的线程,关闭线程池
     */
    public static void shutdownNow() {
       if (executor != null) {
          executor.shutdownNow();
       }
    }

    /**
     * 在 shutdown 线程池之后,阻塞等待所有任务执行完,或发生超时,或当前线程中断,以先发生者为准
     */
    public static boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
       return executor == null || executor.awaitTermination(timeout, unit);
    }
}


源码依然简洁明了,以及中文注释也写的非常清楚!

好了, 接下来分享 JDK21 下的 虚拟线程 配置方式:
在  JFinalConfig 子类:

@Override
public void onStart() {
    //设置为 jdk21+ 虚拟线程,50 这个数量根据业务数量来设置更好,数量少时不要用虚拟线程,没必要做池化
    ScheduledKit.init(Executors.newScheduledThreadPool(50, Thread.ofVirtual().factory()));
}

@Override
public void onStop() {
    //等待正在执行的线程执行完毕以后,关闭线程池
    ScheduledKit.shutdown();
    try {
       ScheduledKit.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
       LogKit.error("任务没处理完,超时关闭:", e);
    }
}

在 onStart 里面配置,在onStop里面等待任务执行完毕后关闭。

业务上几种使用方式就是 :

  1. 以固定延迟执行任务》scheduleWithFixedDelayWithTryCatch

  2. 以固定频率执行任务》scheduleAtFixedRateWithTryCatch

  3. 创建一次性调度,在给定的 delay 时间后调度》schedule

我们的业务场景:

需求:有一个数据库业务列队表,需要按顺序执行多少个。业务是“抢”报名,每个班里面人数有限,然后放开了抢。由ScheduledKit以固定延迟执行任务进行取当前没处理的数据集,然后进行一些业务逻辑对数据标记是否抢成功了,并WX消息告知学员结果。
还有很多场景,这里先不一一分享了。
ScheduledKit 与 ThreadPoolKit 一般搭配使用效果更佳。
ScheduledKit用单一任务不断的取动态数据,可做到类似数据库列队的业务,取到先后顺序再由ThreadPoolKit欻欻的执行业务独立逻辑!
一静一动、静中取动、动中有静 一套组合拳!
image.png就是一个字!“绝”~11111.png

评论区

JFinal

2024-09-20 00:07

虚拟线程用法妙不可言:
ScheduledKit.init(Executors.newScheduledThreadPool(50, Thread.ofVirtual().factory()));

ScheduledKit 已用于爬虫系统。 点赞 + 收藏

杜福忠

2024-09-20 09:07

探索@JFinal 无尽多巴胺~

JFinal

2024-09-20 09:53

@杜福忠 jfinal 几乎每行代码都有原因、考究, 围绕极简与便利一切为了开发者

越探索越愉悦

热门分享

扫码入社