扩展DbPro功能 用于处理大数据量的导出和同步

activeRecord 本身已经很好用,但是最近有个需求需要导入大数据量数据到ElasticSearch,首先肯定不能一次性查询出来,然后想到使用分页查询,

但是由于mysql 大数据量分页存在性能问题(limit n,m 实际上是每次扫描前n+m行数据,然后丢弃前n行数据返回) 故此扩展Dbpro,采用边读边写的模式去处理数据,废话不多说,上石马

1.结果集处理接口

public interface RsHandler<T> extends Serializable {

    /**
     * 处理结果集<br>
     * 结果集处理后不需要关闭
     *
     * @param rs 结果集
     * @return 处理后生成的对象
     * @throws SQLException SQL异常
     */
    T handle(ResultSet rs) throws SQLException;
}

2.自定义MydbPro继承Dbpro

public class MyDbPro extends DbPro {
    public MyDbPro() {
        super();
    }

    public MyDbPro(String configName) {
        super(configName);
    }

    /**
     * @param sql
     * @param rsHandler 结果集处理接口 需要自行实现,主要用于
     *                  以边读边写模式进行大数据量导出和和传输
     * @param paras
     * @param <T>
     * @return
     */
    public <T> T find(String sql, RsHandler<T> rsHandler, Object... paras) {
        Connection conn = null;
        try {
            conn = config.getConnection();
            return find(conn, sql, rsHandler, paras);
        } catch (Exception e) {
            throw new ActiveRecordException(e);
        } finally {
            config.close(conn);
        }
    }

    private <T> T find(Connection conn, String sql, RsHandler<T> rsHandler, Object... paras) throws SQLException {
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            ps = conn.prepareStatement(sql);
            config.getDialect().fillStatement(ps, paras);
            rs = ps.executeQuery();
            return rsHandler.handle(rs);
        } finally {
            config.close(rs, ps, null);
        }
    }
}

3.配置

//启用自定义扩展的db功能
arp.setDbProFactory(MyDbPro::new);

4.使用

Kv kv = new Kv();
MyDbPro db = (MyDbPro) Db.use();
SqlPara sql = db.getSqlPara("infoplus.getFromInstance");
db.find(sql.getSql(), (RsHandler<List<Record>>) rs -> {
    //每次缓存2000条到内存
    rs.setFetchSize(2000);
    List<Record> list = new ArrayList<>();
    int count = 0;
    while (rs.next()) {
        list.add(RsHandlerHelper.handleRow(rs));
        count++;
        if (count % 2000 == 0 || rs.isLast()) {
            //每两千条批量插入一次
            kv.set("list", list);
            esService.insert(kv);
            //清除数据
            list.clear();
        }
    }
    log.info("数据总量:{}", count);
    return null;
}, sql.getPara());

其中结果集转化为record对象 的工具类RsHandlerHelper 代码就不贴了,拷贝jfinal源码 魔改下就行了 

评论区

JFinal

2019-10-27 16:40

很少有人会扩展 DbPro, 扩展 DbPro 的同学都是研究过 jfinal 源码的

DbPro 中所有的方法都是开放的,都可以通过继承来扩展、定制、改变 jfinal 内部的数据库实现,拥有极大的灵活性

感谢楼主的分享

此外,final 方法中还少了一个在 finally 块中关闭 connection 的代码

chcode

2019-10-27 17:00

@JFinal 下面一个方法有关闭,上面的find 还需要写关闭代码吗?

杜福忠

2019-10-27 21:37

2333喜欢这个 上“石马”,记得波大说过,最好是 哪里取的Connection,就在哪里关闭。

JFinal

2019-10-27 21:43

@chcode 你的这个方法是 public ,其他人可以直接调用该方法

原则上是谁获取谁关闭,几乎没有特例

chcode

2019-10-28 10:19

@JFinal 就算是public 应该也没关系啊,里面调用的一个重载方法是可以将连接关闭的啊,我测试结果也是这样,当然多关闭一次也可以

JFinal

2019-10-28 11:50

@chcode 如果不坚持谁打开谁关闭的原则,在代码演化一段时间以后容易出现资源泄漏的 bug

打开连接的方法,将关闭的任务交给别一个方法,这相当于是跨方法的一个约定,这个约定是脆弱的,容易遗忘的,在代码演化一段时间以后容易出现资源泄漏的 bug

chcode

2019-10-28 11:53

@JFinal 明白了,谢谢波总

chcode

2019-10-28 12:08

@JFinal 代码已更新,感谢波总悉心指导

JFinal

2019-10-28 12:15

@chcode 感谢分享

热门分享

扫码入社