activeRecord 本身已经很好用,但是最近有个需求需要导入大数据量数据到ElasticSearch,首先肯定不能一次性查询出来,然后想到使用分页查询,
但是由于mysql 大数据量分页存在性能问题(limit n,m 实际上是每次扫描前n+m行数据,然后丢弃前n行数据返回) 故此扩展Dbpro,采用边读边写的模式去处理数据,废话不多说,上石马
1.结果集处理接口
public interface RsHandler<T> extends Serializable {
/**
* 处理结果集<br>
* 结果集处理后不需要关闭
*
* @param rs 结果集
* @return 处理后生成的对象
* @throws SQLException SQL异常
*/
T handle(ResultSet rs) throws SQLException;
}2.自定义MydbPro继承Dbpro
public class MyDbPro extends DbPro {
public MyDbPro() {
super();
}
public MyDbPro(String configName) {
super(configName);
}
/**
* @param sql
* @param rsHandler 结果集处理接口 需要自行实现,主要用于
* 以边读边写模式进行大数据量导出和和传输
* @param paras
* @param <T>
* @return
*/
public <T> T find(String sql, RsHandler<T> rsHandler, Object... paras) {
Connection conn = null;
try {
conn = config.getConnection();
return find(conn, sql, rsHandler, paras);
} catch (Exception e) {
throw new ActiveRecordException(e);
} finally {
config.close(conn);
}
}
private <T> T find(Connection conn, String sql, RsHandler<T> rsHandler, Object... paras) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
ps = conn.prepareStatement(sql);
config.getDialect().fillStatement(ps, paras);
rs = ps.executeQuery();
return rsHandler.handle(rs);
} finally {
config.close(rs, ps, null);
}
}
}3.配置
//启用自定义扩展的db功能 arp.setDbProFactory(MyDbPro::new);
4.使用
Kv kv = new Kv();
MyDbPro db = (MyDbPro) Db.use();
SqlPara sql = db.getSqlPara("infoplus.getFromInstance");
db.find(sql.getSql(), (RsHandler<List<Record>>) rs -> {
//每次缓存2000条到内存
rs.setFetchSize(2000);
List<Record> list = new ArrayList<>();
int count = 0;
while (rs.next()) {
list.add(RsHandlerHelper.handleRow(rs));
count++;
if (count % 2000 == 0 || rs.isLast()) {
//每两千条批量插入一次
kv.set("list", list);
esService.insert(kv);
//清除数据
list.clear();
}
}
log.info("数据总量:{}", count);
return null;
}, sql.getPara());其中结果集转化为record对象 的工具类RsHandlerHelper 代码就不贴了,拷贝jfinal源码 魔改下就行了
DbPro 中所有的方法都是开放的,都可以通过继承来扩展、定制、改变 jfinal 内部的数据库实现,拥有极大的灵活性
感谢楼主的分享
此外,final 方法中还少了一个在 finally 块中关闭 connection 的代码