继上篇ThreadPoolKit分享的一次性任务,用完即消~
相反ScheduledKit这个工具类是执行重复性任务!与Cron4jPlugin类似,但又不太一样,无依赖、轻量级、玩法多 的特点。
话不多说!上 石马 ~
/** * 调度工具类 * <pre> * 1:scheduleWithFixedDelay 以上一次任务的 "结束时间" 为间隔调度任务 * 2:scheduleAtFixedRate 以上一次任务的 "开始时间" 为间隔调度任务。当本次调度来临时,如果上一次任务未执行完,则等待它执行完成后再立即调度 * 3:警告:必须要在被调度的任务(Runnable/Callable)中捕获异常,否则调度将会停止 * </pre> */ public class ScheduledKit { private static ScheduledExecutorService executor = null; private ScheduledKit() {} /** * 初始化 * * @param corePoolSize the number of threads to keep in the pool, even if they are idle */ public synchronized static void init(int corePoolSize) { if (executor == null) { executor = Executors.newScheduledThreadPool(corePoolSize); } else { Log.getLog(ScheduledKit.class).warn(ScheduledKit.class.getName() + " 已经初始化"); } } /** * 传递 ScheduledExecutorService 对象进行初始化,从而完全掌控线程池参数 */ public synchronized static void init(ScheduledExecutorService executor) { if (ScheduledKit.executor == null) { ScheduledKit.executor = executor; } else { Log.getLog(ScheduledKit.class).warn(ScheduledKit.class.getName() + " 已经初始化"); } } public static ScheduledExecutorService getExecutor() { if (executor == null) { init(5); } return executor; } /** * 以固定延迟执行任务 * @param initialDelay 第一次启动前的延迟 * @param delay 上次任务 "完成" 时间与本次任务 "开始" 时间的间隔 * @param unit 时间单位 * @param task 被执行的任务 */ public static ScheduledFuture<?> scheduleWithFixedDelay(long initialDelay, long delay, TimeUnit unit, Runnable task) { return getExecutor().scheduleWithFixedDelay(task, initialDelay, delay, unit); } /** * 任务添加 try catch ,避免 scheduleWithFixedDelay 方法在调度任务出现异常后会终止调度 */ public static ScheduledFuture<?> scheduleWithFixedDelayWithTryCatch(long initialDelay, long delay, TimeUnit unit, Runnable task) { return scheduleWithFixedDelay(initialDelay, delay, unit, () -> { try { task.run(); } catch (Throwable t) { Log.getLog(ScheduledKit.class).error(t.getMessage(), t); } }); } /** * 以固定频率执行任务 * @param initialDelay 第一次启动前的延迟 * @param period 上次任务 "开始" 时间与本次任务 "开始" 时间的间隔,如果任务执行时长超出 period 值,则在任务执行完成后立即调度任务执行 * @param unit 时间单位 * @param task 被执行的任务 */ public static ScheduledFuture<?> scheduleAtFixedRate(long initialDelay, long period, TimeUnit unit, Runnable task) { return getExecutor().scheduleAtFixedRate(task, initialDelay, period, unit); } /** * 任务添加 try catch ,避免 scheduleAtFixedRate 方法在调度任务出现异常后会终止调度 */ public static ScheduledFuture<?> scheduleAtFixedRateWithTryCatch(long initialDelay, long period, TimeUnit unit, Runnable task) { return scheduleAtFixedRate(initialDelay, period, unit, () -> { try { task.run(); } catch (Throwable t) { Log.getLog(ScheduledKit.class).error(t.getMessage(), t); } }); } /** * 创建一次性调度,在给定的 delay 时间后调度 * @param delay 从现在开始的延迟时间 * @param unit 时间单位 * @param task 被执行任务 */ public static ScheduledFuture<?> schedule(long delay, TimeUnit unit, Runnable task) { return getExecutor().schedule(task, delay, unit); } /** * 创建一次性调度,在给定的 delay 时间后调度 * @param delay 从现在开始的延迟时间 * @param unit 时间单位 * @param task 被执行任务 */ public static <V> ScheduledFuture<V> schedule(long delay, TimeUnit unit, Callable<V> task) { return getExecutor().schedule(task, delay, unit); } /** * 等待正在执行的线程执行完毕以后,关闭线程池 */ public static void shutdown() { if (executor != null) { executor.shutdown(); } } /** * 停掉正在执行的线程,关闭线程池 */ public static void shutdownNow() { if (executor != null) { executor.shutdownNow(); } } /** * 在 shutdown 线程池之后,阻塞等待所有任务执行完,或发生超时,或当前线程中断,以先发生者为准 */ public static boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return executor == null || executor.awaitTermination(timeout, unit); } }
源码依然简洁明了,以及中文注释也写的非常清楚!
好了, 接下来分享 JDK21 下的 虚拟线程 配置方式:
在 JFinalConfig 子类:
@Override public void onStart() { //设置为 jdk21+ 虚拟线程,50 这个数量根据业务数量来设置更好,数量少时不要用虚拟线程,没必要做池化 ScheduledKit.init(Executors.newScheduledThreadPool(50, Thread.ofVirtual().factory())); } @Override public void onStop() { //等待正在执行的线程执行完毕以后,关闭线程池 ScheduledKit.shutdown(); try { ScheduledKit.awaitTermination(1, TimeUnit.MINUTES); } catch (InterruptedException e) { LogKit.error("任务没处理完,超时关闭:", e); } }
在 onStart 里面配置,在onStop里面等待任务执行完毕后关闭。
业务上几种使用方式就是 :
以固定延迟执行任务》scheduleWithFixedDelayWithTryCatch
以固定频率执行任务》scheduleAtFixedRateWithTryCatch
创建一次性调度,在给定的 delay 时间后调度》schedule
我们的业务场景:
需求:有一个数据库业务列队表,需要按顺序执行多少个。业务是“抢”报名,每个班里面人数有限,然后放开了抢。由ScheduledKit以固定延迟执行任务进行取当前没处理的数据集,然后进行一些业务逻辑对数据标记是否抢成功了,并WX消息告知学员结果。
还有很多场景,这里先不一一分享了。
ScheduledKit 与 ThreadPoolKit 一般搭配使用效果更佳。
ScheduledKit用单一任务不断的取动态数据,可做到类似数据库列队的业务,取到先后顺序再由ThreadPoolKit欻欻的执行业务独立逻辑!
一静一动、静中取动、动中有静 一套组合拳!
就是一个字!“绝”~
ScheduledKit.init(Executors.newScheduledThreadPool(50, Thread.ofVirtual().factory()));
ScheduledKit 已用于爬虫系统。 点赞 + 收藏