基于JFinal实现的异构数据交换平台

项目诞生:项目中遇到异构数据交换的需求,在使用kettle过程中发现如果两个交换的终端数据库不能相互访问,那么Kettle无法使用,所以诞生了这个项目。

该项目已在公司内上线,目前仅支持数据的上传、下载;由于无法开源,只能提供部分核心源码进行参考。

1.    系统基本结构图

image.png

2.    接口鉴权:

采用AccessToken方式进行接口鉴权

/**
 * 通过clientid和secret获取AccessToken accessToken用户请求其他接口必备的参数
 * @param clientid
 * @param sign 加密方式 client+yyyyMMddHH+secret的MD5值
 */
public void getAccessToken(){
    String clientid = getPara("clientid");
    String sign = getPara("sign");
    if(StringUtils.isEmpty(clientid)||StringUtils.isEmpty(sign)){
    ResultBean result = new ResultBean("404","参数不完整");
    renderJson(result);
    return;
    }
    ClientDTO client = getClientById(clientid);
    if(client==null){
        ResultBean result = new ResultBean("500","客户端ID不存在");
        renderJson(result);
    }else{
        String plainText = clientid+DateFormatUtil.format(new Date(), "yyyyMMddHH")+client.getStr("secret");
        String signText = MD5.sign(plainText);
        if(signText.equals(sign)){
            //判断最大请求次数
            int reqCount = Db.queryBigDecimal("select count(*) from token t where to_char(scrq,'yyyyMMdd') = to_char(sysdate,'yyyyMMdd') and clientid = ?",clientid).intValue();
            if(reqCount>client.getBigDecimal("max_req").intValue()){
                ResultBean result = new ResultBean("502","当天获取次数超过了最大限制("+client.getBigDecimal("max_req").intValue()+")");
                renderJson(result);
            }else{
                String token = MD5.sign(clientid+new Date().getTime());
                TokenDTO tokenDTO = new TokenDTO();
                tokenDTO.set("clientid", clientid);
                tokenDTO.set("sid", client.getStr("sid"));
                tokenDTO.set("token", token);
                tokenDTO.set("expires_in", 7200);
                Calendar expires_time = Calendar.getInstance();
                expires_time.add(Calendar.SECOND, 7200);
                tokenDTO.set("expires_time",new Timestamp(expires_time.getTime().getTime()));
                tokenDTO.save();
                Kv data = Kv.by("access_token", token).set("expires_in",7200);
                ResultBean result = new ResultBean("0","accessToken获取成功",data);
                renderJson(result);
            }
        }else{
            ResultBean result = new ResultBean("501","参数校验失败");
            renderJson(result);
        }
    }
}

3.    服务端获取数据接口

        /**
	 * 用于获取数据,发送到客户端
	 */
public void getData(){
//TODO数据库事务处理,如果事务非主库事务如何处理
    String transferid = getPara("transferid");
    String clientid = getPara("clientid");
    String token = getPara("token");
    ResultBean bean = authToken(token,clientid);
    if("0".equals(bean.getErrcode())){
        TransferDataDTO dto = TransferDataDTO.dao.findById(transferid);
        if(dto==null){
            bean.setErrcode("601");
            bean.setErrmsg("数据交换不存在");
        }else{
            if(DbKit.getConfig(dto.getStr("from_ds"))==null){
                bean.setErrcode("602");
                bean.setErrmsg("数据源【"+dto.getStr("from_ds")+"】不存在");
            }else{
                Kv params = Kv.by("sid", dto.get("sid"));
                //分页为1时,执行before_exec,其他页不执行before_exec
                if(getPageNumber()==1){
                    execSql(dto,"before_exec","from_ds");
                }
                String datasql = TemplateUtils.convertTemplateToStr(dto.getStr("from_sql"), params).toUpperCase();
                    if(datasql.startsWith("DELETE")){
                        bean.setErrmsg("数据交换平台禁用了【delete】语言");
                    }else{
                        String[] sqlarr = PageSqlKit.parsePageSql(datasql);
                        Page<Record> page = Db.use(dto.getStr("from_ds")).paginate(getPageNumber(), Integer.parseInt(dto.getStr("pageSize")),sqlarr[0],sqlarr[1] );
                        bean.setData(Kv.by("totalRow", page.getTotalRow()).set("pageNumber",page.getPageNumber()).set("pageSize", page.getPageSize()).set("totalPage",page.getTotalPage()).set("dataList",page.getList()));
                        //分页查询为最后一页时执行after_exec
                        if(pageNumber==page.getPageNumber()){
                            execSql(dto,"after_exec","from_ds");
                        }
                    }
                }
            }
        }
        renderJson(bean);
    }

4.    服务端接收数据:

public void pushData(){
    String transferid = getPara("transferid");
    String clientid = getPara("clientid");
    String token = getPara("token");
    String pageNumber = getPara("pageNumber");
    String totalPage = getPara("totalPage");
    ResultBean bean = authToken(token,clientid);
    if("0".equals(bean.getErrcode())){
	TransferDataDTO dto = TransferDataDTO.dao.findById(transferid);
	if(dto==null){
		bean.setErrcode("601");
		bean.setErrmsg("数据交换不存在");
	}else{
            if(DbKit.getConfig(dto.getStr("to_ds"))==null){
		bean.setErrcode("602");
		bean.setErrmsg("数据源【"+dto.getStr("from_ds")+"】不存在");
	    }else{
		Kv params = Kv.by("sid", dto.get("sid"));		
		//分页为1时,执行before_exec,其他页不执行before_exec
		if("1".equals(pageNumber)){
        		execSql(dto,"before_exec","to_ds");
		}
		String requestStr = HttpKit.readData(getRequest());
		JSONArray requestList = JSONObject.parseArray(requestStr);
		List<TransferDataColumnDTO> columns =  TransferDataColumnDTO.dao.find("select * from pt_transfer_data_column t where t.dataid = ?",transferid);
		if(columns==null){
			bean.setErrcode("603");
			bean.setErrmsg("数据交换配置列不存在");
		}else{
	        	if("0".equals(dto.getStr("upflag"))){
				List<Object[]> insertParams = new ArrayList<Object[]>();
				StringBuffer sql = new StringBuffer("insert into "+dto.getStr("to_tablename")+"(");
				for(int c=0;c<columns.size();c++){
					TransferDataColumnDTO column = columns.get(c);
					sql.append(column.getStr("to_columnname"));
					if(c<columns.size()-1){
						sql.append(",");
					}
				}
				sql.append(") values(");
				for(int c=0;c<columns.size();c++){
					sql.append("?");
					if(c<columns.size()-1){
						sql.append(",");
					}
				}
				sql.append(")");
				for(Object data : requestList){
					JSONObject d = (JSONObject)data;
		        		Object[] param = new Object[columns.size()];
					for(int c=0;c<columns.size();c++){
			        		param[c] = getToDataPara(d,columns.get(c));
					}
					insertParams.add(param);
				}
				Db.use(dto.getStr("to_ds")).batch(sql.toString(), insertParams.toArray(new Object[0][0]), 500);
				bean.setErrcode("0");
				bean.setErrmsg("新增数据:"+requestList.size());
			}else{
				List<TransferDataColumnDTO> ukColumns = new ArrayList<TransferDataColumnDTO>();
				List<TransferDataColumnDTO> upColumns = new ArrayList<TransferDataColumnDTO>();
				//先找到唯一字段,和修改字段
				for(int c=0;c<columns.size();c++){
					TransferDataColumnDTO column = columns.get(c);
					if("1".equals(column.getStr("is_uk"))){
						ukColumns.add(column);
					}
					if("1".equals(column.getStr("upflag"))){
				        	upColumns.add(column);
					}
				}
				if(ukColumns.size()==0||upColumns.size()==0){
        				bean.setErrcode("604");
					bean.setErrmsg("数据交换配置列缺少唯一列和修改列");
				}else{
					//记录新增和修改的条数
					int upCount = 0;
					int inCount = 0;
					//拼接sql
					StringBuffer existsSql = new StringBuffer("select case when count(*)>=1 then 1 else 0 end from ").append(dto.getStr("to_tablename")).append(" t where 1=1");
					StringBuffer updateSql = new StringBuffer("update ").append(dto.getStr("to_tablename")).append(" t set ");
					for(int c=0;c<upColumns.size();c++){
						updateSql.append(" t.").append(upColumns.get(c).getStr("to_columnname")).append(" = ? ");
						if(c<upColumns.size()-1){
							updateSql.append(" , ");
						}
					}
					updateSql.append(" where 1=1 ");
					for(int c=0;c<ukColumns.size();c++){
						existsSql.append(" and t.").append(ukColumns.get(c).getStr("to_columnname")).append(" = ? ");
						updateSql.append(" and t.").append(ukColumns.get(c).getStr("to_columnname")).append(" = ? ");
					}
					StringBuffer insertSql = new StringBuffer("insert into "+dto.getStr("to_tablename")+"(");
	        				for(int c=0;c<columns.size();c++){
							TransferDataColumnDTO column = columns.get(c);
							insertSql.append(column.getStr("to_columnname"));
							if(c<columns.size()-1){
								insertSql.append(",");
							}
						}
						insertSql.append(") values(");
						for(int c=0;c<columns.size();c++){
							insertSql.append("?");
							if(c<columns.size()-1){
		        					insertSql.append(",");
							}
						}
						insertSql.append(")");
						
						//处理数据
						List<Object[]> insertParams = new ArrayList<Object[]>();
						List<Object[]> updateParams = new ArrayList<Object[]>();
						
						for(Object data : requestList){
							List<Object> upParam = new ArrayList<Object>();
			    				JSONObject d = (JSONObject)data;
							Object[] existsParam = new Object[ukColumns.size()];
							for(int c=0;c<ukColumns.size();c++){
								existsParam[c] = getToDataPara(d,ukColumns.get(c));
							}
							String existsResult = Db.use(dto.getStr("to_ds")).queryStr(existsSql.toString(),existsParam);
							if("1".equals(existsResult)){
								for(int c=0;c<upColumns.size();c++){
									upParam.add(getToDataPara(d,upColumns.get(c)));
								}
        							for(Object o : existsParam){
        			        				upParam.add(o);
        							}
        							updateParams.add(upParam.toArray());
        				    			upParam=null;
        							upCount++;
        						 }else{
								Object[] insertParam = new Object[columns.size()];
								for(int c=0;c<columns.size();c++){
								insertParam[c] = getToDataPara(d,columns.get(c));
								}
								insertParams.add(insertParam);
								inCount++;
								}
							}
							if(updateParams.size()>0){
								Db.use(dto.getStr("to_ds")).batch(updateSql.toString(),updateParams.toArray(new Object[0][0]),1000);
							}
							if(insertParams.size()>0){
								Db.use(dto.getStr("to_ds")).batch(insertSql.toString(),insertParams.toArray(new Object[0][0]),1000);
							}
							bean.setErrcode("0");
							bean.setErrmsg("新增数据:"+inCount+",修改数据:"+upCount);
							
							//分页查询为最后一页时执行after_exec
							if(pageNumber.equals(totalPage)){
								execSql(dto,"after_exec","to_ds");
							}
						}
					}
				}
			}
		}
	}
	renderJson(bean);
}

5.    客户端采用多线程方式调用服务端接口

实测数据转换运行效率比本地kettle速度更快

   //创建线程池存储数据
    ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
    for(int pageNumber=1;pageNumber<=totalPage;pageNumber++){
      	StoreDataTask task = new StoreDataTask(token,dto,columns,pageNumber,logToken);
        executor.execute(task);
    }
    //创建线程池数据    
    ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
    for(int pageNumber=1;pageNumber<=totalPage;pageNumber++){
       	PushDataTask task = new PushDataTask(token,transferDTO,pageNumber,logToken);
        executor.execute(task);
    }


6.    客户端采用webSocket进行日志回显