继上篇ThreadPoolKit分享的一次性任务,用完即消~
相反ScheduledKit这个工具类是执行重复性任务!与Cron4jPlugin类似,但又不太一样,无依赖、轻量级、玩法多 的特点。
话不多说!上 石马 ~
/**
* 调度工具类
* <pre>
* 1:scheduleWithFixedDelay 以上一次任务的 "结束时间" 为间隔调度任务
* 2:scheduleAtFixedRate 以上一次任务的 "开始时间" 为间隔调度任务。当本次调度来临时,如果上一次任务未执行完,则等待它执行完成后再立即调度
* 3:警告:必须要在被调度的任务(Runnable/Callable)中捕获异常,否则调度将会停止
* </pre>
*/
public class ScheduledKit {
private static ScheduledExecutorService executor = null;
private ScheduledKit() {}
/**
* 初始化
*
* @param corePoolSize the number of threads to keep in the pool, even if they are idle
*/
public synchronized static void init(int corePoolSize) {
if (executor == null) {
executor = Executors.newScheduledThreadPool(corePoolSize);
} else {
Log.getLog(ScheduledKit.class).warn(ScheduledKit.class.getName() + " 已经初始化");
}
}
/**
* 传递 ScheduledExecutorService 对象进行初始化,从而完全掌控线程池参数
*/
public synchronized static void init(ScheduledExecutorService executor) {
if (ScheduledKit.executor == null) {
ScheduledKit.executor = executor;
} else {
Log.getLog(ScheduledKit.class).warn(ScheduledKit.class.getName() + " 已经初始化");
}
}
public static ScheduledExecutorService getExecutor() {
if (executor == null) {
init(5);
}
return executor;
}
/**
* 以固定延迟执行任务
* @param initialDelay 第一次启动前的延迟
* @param delay 上次任务 "完成" 时间与本次任务 "开始" 时间的间隔
* @param unit 时间单位
* @param task 被执行的任务
*/
public static ScheduledFuture<?> scheduleWithFixedDelay(long initialDelay, long delay, TimeUnit unit, Runnable task) {
return getExecutor().scheduleWithFixedDelay(task, initialDelay, delay, unit);
}
/**
* 任务添加 try catch ,避免 scheduleWithFixedDelay 方法在调度任务出现异常后会终止调度
*/
public static ScheduledFuture<?> scheduleWithFixedDelayWithTryCatch(long initialDelay, long delay, TimeUnit unit, Runnable task) {
return scheduleWithFixedDelay(initialDelay, delay, unit, () -> {
try {
task.run();
} catch (Throwable t) {
Log.getLog(ScheduledKit.class).error(t.getMessage(), t);
}
});
}
/**
* 以固定频率执行任务
* @param initialDelay 第一次启动前的延迟
* @param period 上次任务 "开始" 时间与本次任务 "开始" 时间的间隔,如果任务执行时长超出 period 值,则在任务执行完成后立即调度任务执行
* @param unit 时间单位
* @param task 被执行的任务
*/
public static ScheduledFuture<?> scheduleAtFixedRate(long initialDelay, long period, TimeUnit unit, Runnable task) {
return getExecutor().scheduleAtFixedRate(task, initialDelay, period, unit);
}
/**
* 任务添加 try catch ,避免 scheduleAtFixedRate 方法在调度任务出现异常后会终止调度
*/
public static ScheduledFuture<?> scheduleAtFixedRateWithTryCatch(long initialDelay, long period, TimeUnit unit, Runnable task) {
return scheduleAtFixedRate(initialDelay, period, unit, () -> {
try {
task.run();
} catch (Throwable t) {
Log.getLog(ScheduledKit.class).error(t.getMessage(), t);
}
});
}
/**
* 创建一次性调度,在给定的 delay 时间后调度
* @param delay 从现在开始的延迟时间
* @param unit 时间单位
* @param task 被执行任务
*/
public static ScheduledFuture<?> schedule(long delay, TimeUnit unit, Runnable task) {
return getExecutor().schedule(task, delay, unit);
}
/**
* 创建一次性调度,在给定的 delay 时间后调度
* @param delay 从现在开始的延迟时间
* @param unit 时间单位
* @param task 被执行任务
*/
public static <V> ScheduledFuture<V> schedule(long delay, TimeUnit unit, Callable<V> task) {
return getExecutor().schedule(task, delay, unit);
}
/**
* 等待正在执行的线程执行完毕以后,关闭线程池
*/
public static void shutdown() {
if (executor != null) {
executor.shutdown();
}
}
/**
* 停掉正在执行的线程,关闭线程池
*/
public static void shutdownNow() {
if (executor != null) {
executor.shutdownNow();
}
}
/**
* 在 shutdown 线程池之后,阻塞等待所有任务执行完,或发生超时,或当前线程中断,以先发生者为准
*/
public static boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor == null || executor.awaitTermination(timeout, unit);
}
}源码依然简洁明了,以及中文注释也写的非常清楚!
好了, 接下来分享 JDK21 下的 虚拟线程 配置方式:
在 JFinalConfig 子类:
@Override
public void onStart() {
//设置为 jdk21+ 虚拟线程,50 这个数量根据业务数量来设置更好,数量少时不要用虚拟线程,没必要做池化
ScheduledKit.init(Executors.newScheduledThreadPool(50, Thread.ofVirtual().factory()));
}
@Override
public void onStop() {
//等待正在执行的线程执行完毕以后,关闭线程池
ScheduledKit.shutdown();
try {
ScheduledKit.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LogKit.error("任务没处理完,超时关闭:", e);
}
}在 onStart 里面配置,在onStop里面等待任务执行完毕后关闭。
业务上几种使用方式就是 :
以固定延迟执行任务》scheduleWithFixedDelayWithTryCatch
以固定频率执行任务》scheduleAtFixedRateWithTryCatch
创建一次性调度,在给定的 delay 时间后调度》schedule
我们的业务场景:
需求:有一个数据库业务列队表,需要按顺序执行多少个。业务是“抢”报名,每个班里面人数有限,然后放开了抢。由ScheduledKit以固定延迟执行任务进行取当前没处理的数据集,然后进行一些业务逻辑对数据标记是否抢成功了,并WX消息告知学员结果。
还有很多场景,这里先不一一分享了。
ScheduledKit 与 ThreadPoolKit 一般搭配使用效果更佳。
ScheduledKit用单一任务不断的取动态数据,可做到类似数据库列队的业务,取到先后顺序再由ThreadPoolKit欻欻的执行业务独立逻辑!
一静一动、静中取动、动中有静 一套组合拳!
就是一个字!“绝”~![]()
ScheduledKit.init(Executors.newScheduledThreadPool(50, Thread.ofVirtual().factory()));
ScheduledKit 已用于爬虫系统。 点赞 + 收藏