jfinal 独立使用activeRecord进行数据的批量插入
[背景]
将水位数据从源数据的中查询出,处理后,插入到目标数据库,
源数据库数据格式 tableName是 water_level
目标数据库格式
表名是river_level
SELECT * FROM river_level LIMIT 10
两个数据库的数据表的time字段格式不同
第一步,在本地创建目标数据库和表
CREATE DATABASE cjwb DEFAULT CHARACTER SET utf8; USE cjwb; GRANT ALL PRIVILEGES ON cjwb.* TO cjwb@'127.0.0.1' IDENTIFIED BY 'xxx'; GRANT ALL PRIVILEGES ON cjwb.* TO cjwb@'localhost' IDENTIFIED BY 'xxx'; FLUSH PRIVILEGES; CREATE TABLE `river_level` ( `id` VARCHAR(255) NOT NULL, `level` DECIMAL(19,2) DEFAULT NULL, `site_name` VARCHAR(255) DEFAULT NULL, `time` DATETIME DEFAULT NULL, `name` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC
第一步:编写代码
package com.esunward.spider.service; import java.util.ArrayList; import java.util.List; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.druid.DruidPlugin; import com.litong.utils.string.UUIDUtils; import lombok.extern.slf4j.Slf4j; /** * @author litong * @date 2020年9月22日_下午10:38:45 * @version 1.0 * @desc 从水位app上查询出需要的 */ @Slf4j public class WaterLevelImprotService { private String ds1 = "datasource1"; private String ds2 = "datasource2"; private String[] datasource1 = { "jdbc:mysql://xxx/yangtze_river_app?useunicode=true&characterEncoding=utf8&useSSL=false", "yangtze_river_ap", "" }; private String[] datasource2 = { "jdbc:mysql://127.0.0.1:3306/cjwb?useunicode=true&characterEncoding=utf8&serverTimezone=UTC", "cjwb", "xxx" }; /** * 启动数据源 */ private void start() { DruidPlugin plugin1 = new DruidPlugin(datasource1[0], datasource1[1], datasource1[2]); ActiveRecordPlugin arp1 = new ActiveRecordPlugin(ds1, plugin1); plugin1.start(); arp1.start(); DruidPlugin plugin2 = new DruidPlugin(datasource2[0], datasource2[1], datasource2[2]); ActiveRecordPlugin arp2 = new ActiveRecordPlugin(ds2, plugin2); plugin2.start(); arp2.start(); } private void stop() { } /** * 判断数据源是否连接成功 */ public void selectFromAllDatasource() { List<Record> find = Db.use(ds1).find("select 1"); if (find != null) { log.info(ds1 + "连接成功"); } find = Db.use(ds2).find("select 1"); if (find != null) { log.info(ds2 + "连接成功"); } } /** * 从datasource1查询中water_level,处理后插入到river_level,名称设置为spider * datasource1格式 * {site_name:宜宾, level:1, id:857fee196f53390db332060a90028cce, time:2017-01-01} * datasource2格式 * {site_name:宜宾, level:1, id:857fee196f53390db332060a90028cce, time:2017-01-01 08:00} */ public void fromDatasource1ToDatasource2() { String sqlString = "select * from water_level where time>2020-08-15"; List<Record> find = Db.use(ds1).find(sqlString); log.info("水位总条数:{}", find.size()); String sqlString2 = "select count(*) from river_level where site_name=? and time=?"; List<Record> insertList = new ArrayList<Record>(); for (Record r : find) { String timeString = r.getStr("time"); String timeAMString = timeString + " 08:00:00"; Record countRecord = Db.use(ds2).findFirst(sqlString2, r.getStr("site_name"), timeAMString); if (countRecord.getInt("count(*)") == 0) { String random = UUIDUtils.random(); Record insertRecord = new Record(); insertRecord.set("id", random); insertRecord.set("site_name", r.getStr("site_name")); insertRecord.set("level", r.getStr("level")); insertRecord.set("time", timeAMString); insertRecord.set("name", "spider"); insertList.add(insertRecord); } String timePMString = timeString + " 17:00:00"; countRecord = Db.use(ds2).findFirst(sqlString2, r.getStr("site_name"), timePMString); if (countRecord.getInt("count(*)") == 0) { String random = UUIDUtils.random(); Record insertRecord = new Record(); insertRecord.set("id", random); insertRecord.set("site_name", r.getStr("site_name")); insertRecord.set("level", r.getStr("level")); insertRecord.set("time", timePMString); insertRecord.set("name", "spider"); insertList.add(insertRecord); } } log.info("插入的水位总条数:{}", insertList.size()); // for (Record r : insertList) { // System.out.println(r); // } Db.use(ds2).batchSave("river_level", insertList, insertList.size()); } public void getDataSourceCountResult() { // String sql = "select count(*) from river_level where site_name='黄冈' and time='2020-01-14 08:00:00';"; String sql = "select count(*) from river_level where site_name='宜宾' and time='2020-08-13 08:00:00';"; Record findFirst = Db.use(ds1).findFirst(sql); System.out.println(findFirst); /** * 存放返回 {count(*):1} * 不存在 返回 {count(*):0} */ } public static void main(String[] args) { WaterLevelImprotService waterLevelImprotService = new WaterLevelImprotService(); waterLevelImprotService.start(); waterLevelImprotService.fromDatasource1ToDatasource2(); // waterLevelImprotService.selectFromAllDatasource(); // waterLevelImprotService.getDataSourceCountResult(); } }
代码和简单,就不再解释了,代码还有很多可以优化的地方,例如将数据进行拆分,分线程处理
第四步:启动程序测试
第一次运行结果,运行速度很快,因为表中没有数据
第一次运行结果,运行速度很快,因为表中没有数据
第二次运行结果和慢,因为要比对 55368条数据
Db.find
Db.save
Db.template(...).find
Db.batch