项目诞生:项目中遇到异构数据交换的需求,在使用kettle过程中发现如果两个交换的终端数据库不能相互访问,那么Kettle无法使用,所以诞生了这个项目。
该项目已在公司内上线,目前仅支持数据的上传、下载;由于无法开源,只能提供部分核心源码进行参考。
1. 系统基本结构图

2. 接口鉴权:
采用AccessToken方式进行接口鉴权
/**
* 通过clientid和secret获取AccessToken accessToken用户请求其他接口必备的参数
* @param clientid
* @param sign 加密方式 client+yyyyMMddHH+secret的MD5值
*/
public void getAccessToken(){
String clientid = getPara("clientid");
String sign = getPara("sign");
if(StringUtils.isEmpty(clientid)||StringUtils.isEmpty(sign)){
ResultBean result = new ResultBean("404","参数不完整");
renderJson(result);
return;
}
ClientDTO client = getClientById(clientid);
if(client==null){
ResultBean result = new ResultBean("500","客户端ID不存在");
renderJson(result);
}else{
String plainText = clientid+DateFormatUtil.format(new Date(), "yyyyMMddHH")+client.getStr("secret");
String signText = MD5.sign(plainText);
if(signText.equals(sign)){
//判断最大请求次数
int reqCount = Db.queryBigDecimal("select count(*) from token t where to_char(scrq,'yyyyMMdd') = to_char(sysdate,'yyyyMMdd') and clientid = ?",clientid).intValue();
if(reqCount>client.getBigDecimal("max_req").intValue()){
ResultBean result = new ResultBean("502","当天获取次数超过了最大限制("+client.getBigDecimal("max_req").intValue()+")");
renderJson(result);
}else{
String token = MD5.sign(clientid+new Date().getTime());
TokenDTO tokenDTO = new TokenDTO();
tokenDTO.set("clientid", clientid);
tokenDTO.set("sid", client.getStr("sid"));
tokenDTO.set("token", token);
tokenDTO.set("expires_in", 7200);
Calendar expires_time = Calendar.getInstance();
expires_time.add(Calendar.SECOND, 7200);
tokenDTO.set("expires_time",new Timestamp(expires_time.getTime().getTime()));
tokenDTO.save();
Kv data = Kv.by("access_token", token).set("expires_in",7200);
ResultBean result = new ResultBean("0","accessToken获取成功",data);
renderJson(result);
}
}else{
ResultBean result = new ResultBean("501","参数校验失败");
renderJson(result);
}
}
}3. 服务端获取数据接口
/**
* 用于获取数据,发送到客户端
*/
public void getData(){
//TODO数据库事务处理,如果事务非主库事务如何处理
String transferid = getPara("transferid");
String clientid = getPara("clientid");
String token = getPara("token");
ResultBean bean = authToken(token,clientid);
if("0".equals(bean.getErrcode())){
TransferDataDTO dto = TransferDataDTO.dao.findById(transferid);
if(dto==null){
bean.setErrcode("601");
bean.setErrmsg("数据交换不存在");
}else{
if(DbKit.getConfig(dto.getStr("from_ds"))==null){
bean.setErrcode("602");
bean.setErrmsg("数据源【"+dto.getStr("from_ds")+"】不存在");
}else{
Kv params = Kv.by("sid", dto.get("sid"));
//分页为1时,执行before_exec,其他页不执行before_exec
if(getPageNumber()==1){
execSql(dto,"before_exec","from_ds");
}
String datasql = TemplateUtils.convertTemplateToStr(dto.getStr("from_sql"), params).toUpperCase();
if(datasql.startsWith("DELETE")){
bean.setErrmsg("数据交换平台禁用了【delete】语言");
}else{
String[] sqlarr = PageSqlKit.parsePageSql(datasql);
Page<Record> page = Db.use(dto.getStr("from_ds")).paginate(getPageNumber(), Integer.parseInt(dto.getStr("pageSize")),sqlarr[0],sqlarr[1] );
bean.setData(Kv.by("totalRow", page.getTotalRow()).set("pageNumber",page.getPageNumber()).set("pageSize", page.getPageSize()).set("totalPage",page.getTotalPage()).set("dataList",page.getList()));
//分页查询为最后一页时执行after_exec
if(pageNumber==page.getPageNumber()){
execSql(dto,"after_exec","from_ds");
}
}
}
}
}
renderJson(bean);
}4. 服务端接收数据:
public void pushData(){
String transferid = getPara("transferid");
String clientid = getPara("clientid");
String token = getPara("token");
String pageNumber = getPara("pageNumber");
String totalPage = getPara("totalPage");
ResultBean bean = authToken(token,clientid);
if("0".equals(bean.getErrcode())){
TransferDataDTO dto = TransferDataDTO.dao.findById(transferid);
if(dto==null){
bean.setErrcode("601");
bean.setErrmsg("数据交换不存在");
}else{
if(DbKit.getConfig(dto.getStr("to_ds"))==null){
bean.setErrcode("602");
bean.setErrmsg("数据源【"+dto.getStr("from_ds")+"】不存在");
}else{
Kv params = Kv.by("sid", dto.get("sid"));
//分页为1时,执行before_exec,其他页不执行before_exec
if("1".equals(pageNumber)){
execSql(dto,"before_exec","to_ds");
}
String requestStr = HttpKit.readData(getRequest());
JSONArray requestList = JSONObject.parseArray(requestStr);
List<TransferDataColumnDTO> columns = TransferDataColumnDTO.dao.find("select * from pt_transfer_data_column t where t.dataid = ?",transferid);
if(columns==null){
bean.setErrcode("603");
bean.setErrmsg("数据交换配置列不存在");
}else{
if("0".equals(dto.getStr("upflag"))){
List<Object[]> insertParams = new ArrayList<Object[]>();
StringBuffer sql = new StringBuffer("insert into "+dto.getStr("to_tablename")+"(");
for(int c=0;c<columns.size();c++){
TransferDataColumnDTO column = columns.get(c);
sql.append(column.getStr("to_columnname"));
if(c<columns.size()-1){
sql.append(",");
}
}
sql.append(") values(");
for(int c=0;c<columns.size();c++){
sql.append("?");
if(c<columns.size()-1){
sql.append(",");
}
}
sql.append(")");
for(Object data : requestList){
JSONObject d = (JSONObject)data;
Object[] param = new Object[columns.size()];
for(int c=0;c<columns.size();c++){
param[c] = getToDataPara(d,columns.get(c));
}
insertParams.add(param);
}
Db.use(dto.getStr("to_ds")).batch(sql.toString(), insertParams.toArray(new Object[0][0]), 500);
bean.setErrcode("0");
bean.setErrmsg("新增数据:"+requestList.size());
}else{
List<TransferDataColumnDTO> ukColumns = new ArrayList<TransferDataColumnDTO>();
List<TransferDataColumnDTO> upColumns = new ArrayList<TransferDataColumnDTO>();
//先找到唯一字段,和修改字段
for(int c=0;c<columns.size();c++){
TransferDataColumnDTO column = columns.get(c);
if("1".equals(column.getStr("is_uk"))){
ukColumns.add(column);
}
if("1".equals(column.getStr("upflag"))){
upColumns.add(column);
}
}
if(ukColumns.size()==0||upColumns.size()==0){
bean.setErrcode("604");
bean.setErrmsg("数据交换配置列缺少唯一列和修改列");
}else{
//记录新增和修改的条数
int upCount = 0;
int inCount = 0;
//拼接sql
StringBuffer existsSql = new StringBuffer("select case when count(*)>=1 then 1 else 0 end from ").append(dto.getStr("to_tablename")).append(" t where 1=1");
StringBuffer updateSql = new StringBuffer("update ").append(dto.getStr("to_tablename")).append(" t set ");
for(int c=0;c<upColumns.size();c++){
updateSql.append(" t.").append(upColumns.get(c).getStr("to_columnname")).append(" = ? ");
if(c<upColumns.size()-1){
updateSql.append(" , ");
}
}
updateSql.append(" where 1=1 ");
for(int c=0;c<ukColumns.size();c++){
existsSql.append(" and t.").append(ukColumns.get(c).getStr("to_columnname")).append(" = ? ");
updateSql.append(" and t.").append(ukColumns.get(c).getStr("to_columnname")).append(" = ? ");
}
StringBuffer insertSql = new StringBuffer("insert into "+dto.getStr("to_tablename")+"(");
for(int c=0;c<columns.size();c++){
TransferDataColumnDTO column = columns.get(c);
insertSql.append(column.getStr("to_columnname"));
if(c<columns.size()-1){
insertSql.append(",");
}
}
insertSql.append(") values(");
for(int c=0;c<columns.size();c++){
insertSql.append("?");
if(c<columns.size()-1){
insertSql.append(",");
}
}
insertSql.append(")");
//处理数据
List<Object[]> insertParams = new ArrayList<Object[]>();
List<Object[]> updateParams = new ArrayList<Object[]>();
for(Object data : requestList){
List<Object> upParam = new ArrayList<Object>();
JSONObject d = (JSONObject)data;
Object[] existsParam = new Object[ukColumns.size()];
for(int c=0;c<ukColumns.size();c++){
existsParam[c] = getToDataPara(d,ukColumns.get(c));
}
String existsResult = Db.use(dto.getStr("to_ds")).queryStr(existsSql.toString(),existsParam);
if("1".equals(existsResult)){
for(int c=0;c<upColumns.size();c++){
upParam.add(getToDataPara(d,upColumns.get(c)));
}
for(Object o : existsParam){
upParam.add(o);
}
updateParams.add(upParam.toArray());
upParam=null;
upCount++;
}else{
Object[] insertParam = new Object[columns.size()];
for(int c=0;c<columns.size();c++){
insertParam[c] = getToDataPara(d,columns.get(c));
}
insertParams.add(insertParam);
inCount++;
}
}
if(updateParams.size()>0){
Db.use(dto.getStr("to_ds")).batch(updateSql.toString(),updateParams.toArray(new Object[0][0]),1000);
}
if(insertParams.size()>0){
Db.use(dto.getStr("to_ds")).batch(insertSql.toString(),insertParams.toArray(new Object[0][0]),1000);
}
bean.setErrcode("0");
bean.setErrmsg("新增数据:"+inCount+",修改数据:"+upCount);
//分页查询为最后一页时执行after_exec
if(pageNumber.equals(totalPage)){
execSql(dto,"after_exec","to_ds");
}
}
}
}
}
}
}
renderJson(bean);
}5. 客户端采用多线程方式调用服务端接口
实测数据转换运行效率比本地kettle速度更快
//创建线程池存储数据
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
for(int pageNumber=1;pageNumber<=totalPage;pageNumber++){
StoreDataTask task = new StoreDataTask(token,dto,columns,pageNumber,logToken);
executor.execute(task);
}
//创建线程池数据
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
for(int pageNumber=1;pageNumber<=totalPage;pageNumber++){
PushDataTask task = new PushDataTask(token,transferDTO,pageNumber,logToken);
executor.execute(task);
}6. 客户端采用webSocket进行日志回显