activeRecord 本身已经很好用,但是最近有个需求需要导入大数据量数据到ElasticSearch,首先肯定不能一次性查询出来,然后想到使用分页查询,
但是由于mysql 大数据量分页存在性能问题(limit n,m 实际上是每次扫描前n+m行数据,然后丢弃前n行数据返回) 故此扩展Dbpro,采用边读边写的模式去处理数据,废话不多说,上石马
1.结果集处理接口
public interface RsHandler<T> extends Serializable { /** * 处理结果集<br> * 结果集处理后不需要关闭 * * @param rs 结果集 * @return 处理后生成的对象 * @throws SQLException SQL异常 */ T handle(ResultSet rs) throws SQLException; }
2.自定义MydbPro继承Dbpro
public class MyDbPro extends DbPro { public MyDbPro() { super(); } public MyDbPro(String configName) { super(configName); } /** * @param sql * @param rsHandler 结果集处理接口 需要自行实现,主要用于 * 以边读边写模式进行大数据量导出和和传输 * @param paras * @param <T> * @return */ public <T> T find(String sql, RsHandler<T> rsHandler, Object... paras) { Connection conn = null; try { conn = config.getConnection(); return find(conn, sql, rsHandler, paras); } catch (Exception e) { throw new ActiveRecordException(e); } finally { config.close(conn); } } private <T> T find(Connection conn, String sql, RsHandler<T> rsHandler, Object... paras) throws SQLException { PreparedStatement ps = null; ResultSet rs = null; try { ps = conn.prepareStatement(sql); config.getDialect().fillStatement(ps, paras); rs = ps.executeQuery(); return rsHandler.handle(rs); } finally { config.close(rs, ps, null); } } }
3.配置
//启用自定义扩展的db功能 arp.setDbProFactory(MyDbPro::new);
4.使用
Kv kv = new Kv(); MyDbPro db = (MyDbPro) Db.use(); SqlPara sql = db.getSqlPara("infoplus.getFromInstance"); db.find(sql.getSql(), (RsHandler<List<Record>>) rs -> { //每次缓存2000条到内存 rs.setFetchSize(2000); List<Record> list = new ArrayList<>(); int count = 0; while (rs.next()) { list.add(RsHandlerHelper.handleRow(rs)); count++; if (count % 2000 == 0 || rs.isLast()) { //每两千条批量插入一次 kv.set("list", list); esService.insert(kv); //清除数据 list.clear(); } } log.info("数据总量:{}", count); return null; }, sql.getPara());
其中结果集转化为record对象 的工具类RsHandlerHelper 代码就不贴了,拷贝jfinal源码 魔改下就行了
DbPro 中所有的方法都是开放的,都可以通过继承来扩展、定制、改变 jfinal 内部的数据库实现,拥有极大的灵活性
感谢楼主的分享
此外,final 方法中还少了一个在 finally 块中关闭 connection 的代码