项目诞生:项目中遇到异构数据交换的需求,在使用kettle过程中发现如果两个交换的终端数据库不能相互访问,那么Kettle无法使用,所以诞生了这个项目。
该项目已在公司内上线,目前仅支持数据的上传、下载;由于无法开源,只能提供部分核心源码进行参考。
1. 系统基本结构图
2. 接口鉴权:
采用AccessToken方式进行接口鉴权
/** * 通过clientid和secret获取AccessToken accessToken用户请求其他接口必备的参数 * @param clientid * @param sign 加密方式 client+yyyyMMddHH+secret的MD5值 */ public void getAccessToken(){ String clientid = getPara("clientid"); String sign = getPara("sign"); if(StringUtils.isEmpty(clientid)||StringUtils.isEmpty(sign)){ ResultBean result = new ResultBean("404","参数不完整"); renderJson(result); return; } ClientDTO client = getClientById(clientid); if(client==null){ ResultBean result = new ResultBean("500","客户端ID不存在"); renderJson(result); }else{ String plainText = clientid+DateFormatUtil.format(new Date(), "yyyyMMddHH")+client.getStr("secret"); String signText = MD5.sign(plainText); if(signText.equals(sign)){ //判断最大请求次数 int reqCount = Db.queryBigDecimal("select count(*) from token t where to_char(scrq,'yyyyMMdd') = to_char(sysdate,'yyyyMMdd') and clientid = ?",clientid).intValue(); if(reqCount>client.getBigDecimal("max_req").intValue()){ ResultBean result = new ResultBean("502","当天获取次数超过了最大限制("+client.getBigDecimal("max_req").intValue()+")"); renderJson(result); }else{ String token = MD5.sign(clientid+new Date().getTime()); TokenDTO tokenDTO = new TokenDTO(); tokenDTO.set("clientid", clientid); tokenDTO.set("sid", client.getStr("sid")); tokenDTO.set("token", token); tokenDTO.set("expires_in", 7200); Calendar expires_time = Calendar.getInstance(); expires_time.add(Calendar.SECOND, 7200); tokenDTO.set("expires_time",new Timestamp(expires_time.getTime().getTime())); tokenDTO.save(); Kv data = Kv.by("access_token", token).set("expires_in",7200); ResultBean result = new ResultBean("0","accessToken获取成功",data); renderJson(result); } }else{ ResultBean result = new ResultBean("501","参数校验失败"); renderJson(result); } } }
3. 服务端获取数据接口
/** * 用于获取数据,发送到客户端 */ public void getData(){ //TODO数据库事务处理,如果事务非主库事务如何处理 String transferid = getPara("transferid"); String clientid = getPara("clientid"); String token = getPara("token"); ResultBean bean = authToken(token,clientid); if("0".equals(bean.getErrcode())){ TransferDataDTO dto = TransferDataDTO.dao.findById(transferid); if(dto==null){ bean.setErrcode("601"); bean.setErrmsg("数据交换不存在"); }else{ if(DbKit.getConfig(dto.getStr("from_ds"))==null){ bean.setErrcode("602"); bean.setErrmsg("数据源【"+dto.getStr("from_ds")+"】不存在"); }else{ Kv params = Kv.by("sid", dto.get("sid")); //分页为1时,执行before_exec,其他页不执行before_exec if(getPageNumber()==1){ execSql(dto,"before_exec","from_ds"); } String datasql = TemplateUtils.convertTemplateToStr(dto.getStr("from_sql"), params).toUpperCase(); if(datasql.startsWith("DELETE")){ bean.setErrmsg("数据交换平台禁用了【delete】语言"); }else{ String[] sqlarr = PageSqlKit.parsePageSql(datasql); Page<Record> page = Db.use(dto.getStr("from_ds")).paginate(getPageNumber(), Integer.parseInt(dto.getStr("pageSize")),sqlarr[0],sqlarr[1] ); bean.setData(Kv.by("totalRow", page.getTotalRow()).set("pageNumber",page.getPageNumber()).set("pageSize", page.getPageSize()).set("totalPage",page.getTotalPage()).set("dataList",page.getList())); //分页查询为最后一页时执行after_exec if(pageNumber==page.getPageNumber()){ execSql(dto,"after_exec","from_ds"); } } } } } renderJson(bean); }
4. 服务端接收数据:
public void pushData(){ String transferid = getPara("transferid"); String clientid = getPara("clientid"); String token = getPara("token"); String pageNumber = getPara("pageNumber"); String totalPage = getPara("totalPage"); ResultBean bean = authToken(token,clientid); if("0".equals(bean.getErrcode())){ TransferDataDTO dto = TransferDataDTO.dao.findById(transferid); if(dto==null){ bean.setErrcode("601"); bean.setErrmsg("数据交换不存在"); }else{ if(DbKit.getConfig(dto.getStr("to_ds"))==null){ bean.setErrcode("602"); bean.setErrmsg("数据源【"+dto.getStr("from_ds")+"】不存在"); }else{ Kv params = Kv.by("sid", dto.get("sid")); //分页为1时,执行before_exec,其他页不执行before_exec if("1".equals(pageNumber)){ execSql(dto,"before_exec","to_ds"); } String requestStr = HttpKit.readData(getRequest()); JSONArray requestList = JSONObject.parseArray(requestStr); List<TransferDataColumnDTO> columns = TransferDataColumnDTO.dao.find("select * from pt_transfer_data_column t where t.dataid = ?",transferid); if(columns==null){ bean.setErrcode("603"); bean.setErrmsg("数据交换配置列不存在"); }else{ if("0".equals(dto.getStr("upflag"))){ List<Object[]> insertParams = new ArrayList<Object[]>(); StringBuffer sql = new StringBuffer("insert into "+dto.getStr("to_tablename")+"("); for(int c=0;c<columns.size();c++){ TransferDataColumnDTO column = columns.get(c); sql.append(column.getStr("to_columnname")); if(c<columns.size()-1){ sql.append(","); } } sql.append(") values("); for(int c=0;c<columns.size();c++){ sql.append("?"); if(c<columns.size()-1){ sql.append(","); } } sql.append(")"); for(Object data : requestList){ JSONObject d = (JSONObject)data; Object[] param = new Object[columns.size()]; for(int c=0;c<columns.size();c++){ param[c] = getToDataPara(d,columns.get(c)); } insertParams.add(param); } Db.use(dto.getStr("to_ds")).batch(sql.toString(), insertParams.toArray(new Object[0][0]), 500); bean.setErrcode("0"); bean.setErrmsg("新增数据:"+requestList.size()); }else{ List<TransferDataColumnDTO> ukColumns = new ArrayList<TransferDataColumnDTO>(); List<TransferDataColumnDTO> upColumns = new ArrayList<TransferDataColumnDTO>(); //先找到唯一字段,和修改字段 for(int c=0;c<columns.size();c++){ TransferDataColumnDTO column = columns.get(c); if("1".equals(column.getStr("is_uk"))){ ukColumns.add(column); } if("1".equals(column.getStr("upflag"))){ upColumns.add(column); } } if(ukColumns.size()==0||upColumns.size()==0){ bean.setErrcode("604"); bean.setErrmsg("数据交换配置列缺少唯一列和修改列"); }else{ //记录新增和修改的条数 int upCount = 0; int inCount = 0; //拼接sql StringBuffer existsSql = new StringBuffer("select case when count(*)>=1 then 1 else 0 end from ").append(dto.getStr("to_tablename")).append(" t where 1=1"); StringBuffer updateSql = new StringBuffer("update ").append(dto.getStr("to_tablename")).append(" t set "); for(int c=0;c<upColumns.size();c++){ updateSql.append(" t.").append(upColumns.get(c).getStr("to_columnname")).append(" = ? "); if(c<upColumns.size()-1){ updateSql.append(" , "); } } updateSql.append(" where 1=1 "); for(int c=0;c<ukColumns.size();c++){ existsSql.append(" and t.").append(ukColumns.get(c).getStr("to_columnname")).append(" = ? "); updateSql.append(" and t.").append(ukColumns.get(c).getStr("to_columnname")).append(" = ? "); } StringBuffer insertSql = new StringBuffer("insert into "+dto.getStr("to_tablename")+"("); for(int c=0;c<columns.size();c++){ TransferDataColumnDTO column = columns.get(c); insertSql.append(column.getStr("to_columnname")); if(c<columns.size()-1){ insertSql.append(","); } } insertSql.append(") values("); for(int c=0;c<columns.size();c++){ insertSql.append("?"); if(c<columns.size()-1){ insertSql.append(","); } } insertSql.append(")"); //处理数据 List<Object[]> insertParams = new ArrayList<Object[]>(); List<Object[]> updateParams = new ArrayList<Object[]>(); for(Object data : requestList){ List<Object> upParam = new ArrayList<Object>(); JSONObject d = (JSONObject)data; Object[] existsParam = new Object[ukColumns.size()]; for(int c=0;c<ukColumns.size();c++){ existsParam[c] = getToDataPara(d,ukColumns.get(c)); } String existsResult = Db.use(dto.getStr("to_ds")).queryStr(existsSql.toString(),existsParam); if("1".equals(existsResult)){ for(int c=0;c<upColumns.size();c++){ upParam.add(getToDataPara(d,upColumns.get(c))); } for(Object o : existsParam){ upParam.add(o); } updateParams.add(upParam.toArray()); upParam=null; upCount++; }else{ Object[] insertParam = new Object[columns.size()]; for(int c=0;c<columns.size();c++){ insertParam[c] = getToDataPara(d,columns.get(c)); } insertParams.add(insertParam); inCount++; } } if(updateParams.size()>0){ Db.use(dto.getStr("to_ds")).batch(updateSql.toString(),updateParams.toArray(new Object[0][0]),1000); } if(insertParams.size()>0){ Db.use(dto.getStr("to_ds")).batch(insertSql.toString(),insertParams.toArray(new Object[0][0]),1000); } bean.setErrcode("0"); bean.setErrmsg("新增数据:"+inCount+",修改数据:"+upCount); //分页查询为最后一页时执行after_exec if(pageNumber.equals(totalPage)){ execSql(dto,"after_exec","to_ds"); } } } } } } } renderJson(bean); }
5. 客户端采用多线程方式调用服务端接口
实测数据转换运行效率比本地kettle速度更快
//创建线程池存储数据 ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); for(int pageNumber=1;pageNumber<=totalPage;pageNumber++){ StoreDataTask task = new StoreDataTask(token,dto,columns,pageNumber,logToken); executor.execute(task); } //创建线程池数据 ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); for(int pageNumber=1;pageNumber<=totalPage;pageNumber++){ PushDataTask task = new PushDataTask(token,transferDTO,pageNumber,logToken); executor.execute(task); }
6. 客户端采用webSocket进行日志回显