前言
加入jfinal club也有1个月之多了,学习到很多,总是向人家索取,自己并未付出的这种状态不是我想要的。本人实力有限也想尽可能的提供自己的绵薄之力,这次就抛砖引入分享一下Quartz定时任务Redis集群方案,参考社区之前有人已经提供了的QuartzPlugin和波总在cron4j中解答的如何去现实cron4j的集群方案
为什么要做redis集群方案
有人会说Quartz已经有一套成熟的集群方案了,但是那个是数据库版本,而且配置我也觉得挺麻烦,我不喜欢直连数据库,感觉很费资源
QuartzPlugin我就不贴出来了,之前就有人分享过
主要实现的接口,里面用到的一些redis操作是自己写的RedisCache,模仿jfinal ehcache的写法
主要流程
package com.jfinalshop.task; import com.jfinal.ext2.kit.JsonExtKit; import com.jfinal.kit.JsonKit; import com.jfinal.kit.StrKit; import com.jfinalshop.util.IQuartzJobLoader; import com.jfinalshop.util.RedisCacheKit; import org.apache.commons.collections.map.HashedMap; import org.quartz.Job; import org.quartz.JobExecutionContext; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by little fish on 2017/8/5. */ public abstract class RedisJob implements Job { private final String JOB_CACHE_PREFIX = "quartz_job_"; private final String JOB_LOCK_KEY = "isLock"; private final String JOB_RUN_COUNT_KEY = "runCount"; private final String JOB_FAIL_COUNT_KEY = "failCount"; private final String JOB_FAIL_MESSAGE_KEY = "failMessage"; private final String JOB_LAST_FIRE_TIME_KEY = "lastFireTime"; private final Integer JOB_LOCK_TRUE = 1; private final Integer JOB_LOCK_FALSE = 0; private static ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<String, ReentrantLock>(); private ReentrantLock getLock(String key) { ReentrantLock lock = lockMap.get(key); if (lock != null) return lock; lock = new ReentrantLock(); ReentrantLock previousLock = lockMap.putIfAbsent(key, lock); return previousLock == null ? lock : previousLock; } private String genJobCacheName(JobExecutionContext jobExecutionContext){ return String.format("%s%s", JOB_CACHE_PREFIX, jobExecutionContext.getJobInstance().getClass().getSimpleName()); } private Map<Object, Object> genResetJobCacheMap(int runCount, Long fireTime,int failCount, String failMessage){ Map<Object, Object> loadData = new HashedMap() ; loadData.put(JOB_LOCK_KEY, JOB_LOCK_FALSE);//去锁 loadData.put(JOB_LAST_FIRE_TIME_KEY, fireTime);//同步最后一次触发时间 loadData.put(JOB_RUN_COUNT_KEY, runCount++);//运行次数加1 loadData.put(JOB_FAIL_COUNT_KEY, failCount);//错误次数 loadData.put(JOB_FAIL_MESSAGE_KEY, failMessage);//错误信息 return loadData; } public boolean checkJobValid(JobExecutionContext jobExecutionContext, IQuartzJobLoader jobLoader){ String cacheName = genJobCacheName(jobExecutionContext); Integer isLock = RedisCacheKit.hget(cacheName, JOB_LOCK_KEY); if(isLock == JOB_LOCK_TRUE){ return false; } Lock lock = getLock(cacheName); lock.lock(); //初始化一些默认值 Integer runCount = 0; Integer failCount = 0; Long lastFireTime = 0L; String failMessageJsonStr = ""; //实际插入redis的数据 Map<Object, Object> loadData = null; try { //再次拿出数据做double check检查并发 Map<Object, Object> redisCacheData = RedisCacheKit.hgetAll(cacheName); isLock = (Integer)redisCacheData.get(JOB_LOCK_KEY); lastFireTime = (Long)redisCacheData.get(JOB_LAST_FIRE_TIME_KEY); lastFireTime = lastFireTime != null?lastFireTime:0; runCount = (Integer)redisCacheData.get(JOB_RUN_COUNT_KEY); runCount = runCount != null?runCount:0; failCount = (Integer)redisCacheData.get(JOB_FAIL_COUNT_KEY); failCount = failCount != null?failCount:0; failMessageJsonStr = (String) redisCacheData.get(JOB_FAIL_MESSAGE_KEY); failMessageJsonStr = failMessageJsonStr != null?failMessageJsonStr:""; //检查是否满足执行条件 if ((isLock == null || isLock == JOB_LOCK_FALSE) && lastFireTime < jobExecutionContext.getFireTime().getTime()) { //Redis加锁 RedisCacheKit.hset(cacheName, JOB_LOCK_KEY, JOB_LOCK_TRUE); //处理Task主要业务逻辑 jobLoader.load(); //处理成功后把最新的状态同步到Redis中 loadData = genResetJobCacheMap(JOB_LOCK_FALSE, jobExecutionContext.getFireTime().getTime(), failCount, failMessageJsonStr); } return true; }catch (Exception e){ e.printStackTrace(); failCount++;//失败次数+1 //添加失败信息以便之后检查 List<RedisJobFailMessage> faileMessageArray = null; if(StrKit.isBlank(failMessageJsonStr)){ faileMessageArray = new ArrayList<RedisJobFailMessage>(); }else{ try { faileMessageArray = JsonExtKit.jsonToJSONArray(failMessageJsonStr).toJavaList(RedisJobFailMessage.class); }catch (Exception ex){ ex.printStackTrace(); } if(faileMessageArray == null){ faileMessageArray = new ArrayList<RedisJobFailMessage>(); } } faileMessageArray.add(new RedisJobFailMessage(lastFireTime, e.getMessage())); failMessageJsonStr = JsonKit.toJson(faileMessageArray); loadData = genResetJobCacheMap(JOB_LOCK_FALSE, jobExecutionContext.getFireTime().getTime(), failCount, failMessageJsonStr); }finally { RedisCacheKit.hmsetForever(cacheName, loadData); lock.unlock(); } return false; } }
一个Loader接口其实就是和IDataLoader一模一样
package com.jfinalshop.util; /** * Created by little fish on 2017/7/16. */ public interface IQuartzJobLoader { public void load(Object... args); }
最后调用
package com.jfinalshop.task; import com.jfinal.kit.LogKit; import com.jfinalshop.util.IQuartzJobLoader; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * Created by little fish on 2017/8/5. */ public class TaskTest extends RedisJob { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { this.checkJobValid(jobExecutionContext, new IQuartzJobLoader() { @Override public void load(Object... args) { //验证通过执行任务 LogKit.info(String.format("getClass = %s fireTime = %s runtime = %s nextFireTime = %s result = %s",jobExecutionContext.getJobInstance().getClass().getSimpleName(),jobExecutionContext.getFireTime(),jobExecutionContext.getJobRunTime(),jobExecutionContext.getNextFireTime(),jobExecutionContext.getResult())); } }); } }
刚写完的,代码写的很粗糙,只是做过冒烟测试,可以正常执行,没有做过优化