cron4j 增加task表 支持集群 有疑问

按照之前波总的指导,修改了下调度任务,下面贴出主要代码,是否有错误?

修改Cron4jPulgin

public boolean start() {
    for (TaskInfo taskInfo : taskInfoList) {
        //将任务写入数据库task表
        Date now = new Date();
        TyleeTask task = new TyleeTask();
        task.setTaskName(taskInfo.task.toString());
        task.setCron(taskInfo.cron);
        task.setCreateDate(now);
        task.setUpdateDate(now);
        task.setStatus(Constants.TASK_INIT);
        task.setLock("0");
        task.setIP(Utils.getLocalIP());
        task.save();

        taskInfo.schedule();
    }

    try {
        Thread.sleep(new Random().nextInt(2000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    for (TaskInfo taskInfo : taskInfoList) {
        taskInfo.start();
    }
    return true;
}

建立task数据表

CREATE TABLE `xhwy`.`task` (
  `id` INT NOT NULL AUTO_INCREMENT COMMENT '',
  `task_name` VARCHAR(200) NULL COMMENT '调度任务名称',
  `cron` VARCHAR(100) NULL COMMENT '',
  `create_date` DATETIME NULL COMMENT '',
  `update_date` DATETIME NULL COMMENT '',
  `status` VARCHAR(5) NULL COMMENT '处理状态,0初始、1正处理、2成功、3失败',
  `lock` VARCHAR(5) NULL COMMENT '锁,lock 为 1 时不处理',
  PRIMARY KEY (`id`)  COMMENT '')
COMMENT = '调度任务记录表';

ALTER TABLE `xhwy`.`task`
ADD COLUMN `ip` VARCHAR(45) NULL COMMENT '' AFTER `lock`;

修改定时任务 run方法

public void run() {
    //修改订单状态
    TyleeTask task = null;
    TyleeTask otherTask = null;
    try {
        //判断此调度任务是否在其他服务器正在执行
        String tomcat1IP = PropKit.get("tomcat1IP_" + PropKit.get("flag"));
        String tomcat2IP = PropKit.get("tomcat2IP_" + PropKit.get("flag"));
        String localIP = Utils.getLocalIP();
        String className = this.getClass().getName();

        if (StringUtils.equals(tomcat1IP, localIP)) {
            otherTask = TyleeTask.dao.getInfoByIP(tomcat2IP, className);
            if (StringUtils.equals(otherTask.getLock(), "1")) {
                return;
            }
            task = TyleeTask.dao.getInfoByIP(localIP, className);
        } else if (StringUtils.equals(tomcat2IP, localIP)) {
            otherTask = TyleeTask.dao.getInfoByIP(tomcat1IP, className);
            if (StringUtils.equals(otherTask.getLock(), "1")) {
                return;
            }
            task = TyleeTask.dao.getInfoByIP(localIP, className);
        }

        //执行此调度任务时,先上锁
        task.setStatus(Constants.TASK_IMPLEMENTATION);
        task.setLock("1");
        task.setUpdateDate(new Date());
        task.update();

        checkUpdateOrder();

        //执行完调度任务,解锁
        task.setStatus(Constants.TASK_COMPLETE);
        task.setLock("0");
        task.setUpdateDate(new Date());
        task.update();

    } catch (Exception e) {
        task.setStatus(Constants.TASK_FAILURE);
        task.setLock("0");
        task.setUpdateDate(new Date());
        task.update();
        logger.error("checkUpdateOrder is error: " + e.getMessage(), e);
    }
}


现在有点疑问,应该还是哪里错了,这个定时任务,似乎还是重复执行了,14号重新发布的,几天17号去看调度数据发现:

(文本编辑不能上传图片的吗?)

# id, task_name, cron, create_date, update_date, status, lock, ip
'25', 'com.lcbp.tools.timer.SysDoCartItemOrderJob@24a5ea8d', '30 * * * *', '2016-09-14 17:59:04', '2016-09-17 10:30:00', '2', '0', '10.xx.xxx.170'
'28', 'com.lcbp.tools.timer.SysDoCartItemOrderJob@6a716e31', '30 * * * *', '2016-09-14 18:00:18', '2016-09-17 10:30:00', '2', '0', '10.xx.xxx.89'

执行时间又重合了,是不是我上面的定时任务中的run里面的方法写错了吗?

评论区

JFinal

2016-09-17 11:14

全程没有使用事务,必然会有问题,核心在于任务调度线程在获取 task 记录,并让该记录的 lock 由 0 变 1 的过程在事务中并且是排它的,核心代码大致是如下的形式:

final Ret ret = Ret.create();
boolean isOk = Db.tx(new IAtom() {
public boolean run() {
Task taskList = Db.find("select * from task where lock=0");
String idList = IdKit.join(taskList); // 将 taskList 的所有 id 变成这样:(1,2,3)
int n = Db.update("update task set lock=1 where lock=0 and id in" + idList);
ret.put("taskList", taskList);
return taskList.size() == n && n > 0; // 只有取出来的数量与更新数量相同时才可以提交事务
}
}

if (isOk) {
处理 ret.get("taskList")
} else {
未获取到可处理的 task
}

热门反馈

扫码入社