JFinal使用技巧-多数据源XA分布式事务工具类

多数据源分布式事务问题在社区也多次被提及,我们业务上在几年前也有遇到过。
当时我使用的是两个db.tx 嵌套的模式,在代码中保障执行的sql 是没有问题,但是db2提交成功,db1失败的话,只能 db2再次执行删除或修改相关数据。相当于手动还原了数据。业务上到也不需要特别保障实时性,把账对上就行,只有两处需要,所以也没深研究什么分布式事务。

两个db.tx嵌套的大致代码:

DbPro db2 = Db.use("db2");
try {
    boolean tx = Db.tx(() -> db2.tx(() -> {
        try {
            // 业务处理
            return true;
        } catch (Exception e) {
            return false;
        }
    }));
    // 返回tx结果
} catch (Exception e) {
    // db2 执行删除或修改相关数据
}

如果业务上用到少不敏感的话,感觉这样弄问题也不大。


转回正题,前两天看到反馈区又在说这个多数据源分布式事务的问题,心里一直痒痒想看看这个分布式事务到底咋回事。。。
今天周末抽空通过网上搜索+AI 辅助。了解到 JDBC 里面有个XA 事务,平时用到是Connection,它用的是XAConnection。
大致来说就是 在commit提交前问了一下数据库:“老弟 are you ok ?”
prepare 方法,没问题就是返回 true,不行就是 false


问了一下 AI 代码例子,自己再整理一下为JF的DbXA工具类,废话不多说上码:

package com.demo.common.plugin.activerecord;

import com.jfinal.kit.LogKit;
import com.jfinal.kit.StrKit;
import com.jfinal.plugin.activerecord.*;

import javax.sql.DataSource;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
 * XA 事务
 */
public class DbXA {

    public static boolean tx(IAtom atom, String... configNames) {
        return tx(Connection.TRANSACTION_SERIALIZABLE, atom, configNames);
    }

    public static boolean tx(int transactionLevel, IAtom atom, String... configNames) {
        List<DbXAUnit> list = createDbXAUnitList(configNames);
        try {
            // 初始化XA链接
            init(transactionLevel, list);
            // 开始XA事务
            start(list);
            // 执行SQL操作
            boolean result = atom.run();
            if ( ! result){
                // 回滚事务
                rollback(list);
                return false;
            }
            // 结束事务的SQL操作阶段
            end(list);
            // 准备提交事务
            if (prepare(list)) {
                // 提交事务
                commit(list);
            } else {
                // 回滚事务
                rollback(list);
            }
            return true;
        } catch (NestedTransactionHelpException e) {
            // 回滚事务
            rollback(list);
            LogKit.logNothing(e);
            return false;
        } catch (Throwable t) {
            // 回滚事务
            rollback(list);
            throw t instanceof RuntimeException ? (RuntimeException) t : new ActiveRecordException(t);
        } finally {
            // 关闭连接
            close(list);
        }
    }

    private static void init(int transactionLevel, List<DbXAUnit> list) throws SQLException {
        DbXidGroup dbXidGroup = new DbXidGroup(transactionLevel);
        for (DbXAUnit unit : list) {
            unit.init(dbXidGroup.create(unit.configName));
        }
    }

    private static void start(List<DbXAUnit> list) throws XAException {
        for (DbXAUnit unit : list) {
            unit.xaResource.start(unit.xid, XAResource.TMJOIN);
        }
    }

    private static void end(List<DbXAUnit> list) throws XAException {
        for (DbXAUnit unit : list) {
            unit.xaResource.end(unit.xid, XAResource.TMSUCCESS);
        }
    }

    private static void commit(List<DbXAUnit> list) throws XAException {
        for (DbXAUnit unit : list) {
            unit.xaResource.commit(unit.xid, false);
            //unit.config.executeCallbackAfterTxCommit();
        }
    }

    private static void close(List<DbXAUnit> list) {
        for (DbXAUnit unit : list) {
            unit.close();
        }
    }

    private static void rollback(List<DbXAUnit> list) {
        for (DbXAUnit unit : list) {
            unit.rollback();
        }
    }

    private static boolean prepare(List<DbXAUnit> list) throws XAException {
        for (DbXAUnit unit : list) {
            int prepare = unit.xaResource.prepare(unit.xid);
            if (prepare != XAResource.XA_OK && prepare != XAResource.XA_RDONLY) {
                return false;
            }
        }
        return true;
    }

    private static List<DbXAUnit> createDbXAUnitList(String[] configNames) {
        List<DbXAUnit> list = new ArrayList<>(configNames.length);
        for (String configName : configNames) {
            list.add(createDbXAUnit(configName));
        }
        return list;
    }

    private static DbXAUnit createDbXAUnit(String configName) {
        Config config = DbKit.getConfig(configName);
        DataSource dataSource = config.getDataSource();
        if (!(dataSource instanceof XADataSource)) {
            throw new RuntimeException(configName + " : dataSource is not XADataSource[数据库连接池不支持XA模式]");
        }
        return new DbXAUnit(config, (XADataSource) dataSource);
    }

    public static class DbXAUnit {
        private final Config config;
        private final String configName;
        private final XADataSource xaDataSource;
        private XAConnection xaConnection;
        private XAResource xaResource;
        private Xid xid;

        public DbXAUnit(Config config, XADataSource xaDataSource) {
            this.config = config;
            this.configName = config.getName();
            this.xaDataSource = xaDataSource;
        }

        public void init(Xid xid) throws SQLException {
            this.xid = xid;
            xaConnection = xaDataSource.getXAConnection();
            xaResource = xaConnection.getXAResource();
            config.setThreadLocalConnection(xaConnection.getConnection());
        }

        public void close() {
            if (this.xaConnection != null) {
                try {
                    this.xaConnection.close();
                } catch (Throwable t) {
                    LogKit.error("关闭xa连接异常:" + configName + " : " + xid.toString() + " ," + t.getMessage(), t);
                } finally {
                    config.removeThreadLocalConnection();
                }
            }
        }

        public void rollback() {
            if (xaResource != null) {
                try {
                    xaResource.rollback(xid);
                } catch (Throwable t) {
                    LogKit.error("回滚xa连接异常:" + configName + " : " + xid.toString() + " ," + t.getMessage(), t);
                }
            }
        }

        @Override
        public String toString() {
            return "DbXAGroup{" +
                    "configName='" + configName + '\'' +
                    ", xaDataSource=" + xaDataSource +
                    ", xaConnection=" + xaConnection +
                    ", xaResource=" + xaResource +
                    ", xid=" + xid +
                    '}';
        }
    }

    public static class DbXidGroup {
        private final String groupID = StrKit.getRandomUUID();
        private final byte[] globalTransactionId = groupID.getBytes();
        private final int formatId;

        public DbXidGroup(int formatId) {
            this.formatId = formatId;
        }

        public Xid create(String configName) {
            byte[] branchQualifier = configName.getBytes();
            return new Xid() {
                @Override
                public String toString() {
                    return "Xid{" +
                            "formatId=" + formatId +
                            ", groupID='" + groupID + '\'' +
                            ", configName='" + configName + '\'' +
                            '}';
                }

                @Override
                public int getFormatId() {
                    return formatId;
                }

                @Override
                public byte[] getGlobalTransactionId() {
                    return globalTransactionId;
                }

                @Override
                public byte[] getBranchQualifier() {
                    return branchQualifier;
                }
            };
        }
    }

}


OK 保持了 JF 风格 ~

DbXA.tx(() -> {
//xxx
return true;
}, configName1, configName2, configNameN);


再写一个测试例子:

测试中我使用的数据库连接池是 BeeCpPlugin https://jfinal.com/share/2757

public static void main(String[] args){
    Prop p = PropKit.use("demo-config-dev.txt");
    BeeCpPlugin dbp = new BeeCpPlugin(p.get("jdbcUrl"), p.get("user"), p.get("password"));
    dbp.start();
    BeeCpPlugin dbp2 = new BeeCpPlugin(p.get("jdbcUrl2"), p.get("user"), p.get("password"));
    dbp2.start();

    ActiveRecordPlugin arp = new ActiveRecordPlugin(dbp);
    arp.start();
    final String configName2 = "db2";
    ActiveRecordPlugin arp2 = new ActiveRecordPlugin(configName2, dbp2);
    arp2.start();

    System.out.println("tx >>> ");
    try {
        boolean tx = DbXA.tx(() -> {
            Date date = new Date();
            Db.save("exam_info", new Record().set("exam_date", date));
            Db.use(configName2).save("exam_info", new Record().set("exam_date", date));
            error();
            return true;
        }, DbKit.MAIN_CONFIG_NAME, configName2);
        System.out.println("tx = " + tx);
    } finally {
        dbp.stop();
        dbp2.stop();
    }
}

private static void error() {
    throw new NestedTransactionHelpException("测试异常情况!");
}

使用上比多个db.tx嵌套,特别是 2 个以上嵌套要清晰一些。大家测试的时候,可以来回摩擦error() 方法,模拟不同的异常情况。

PS 注意事项:
MySQL 需要表是 InnoDB ,并且是开启XA 事务的。

可以通过以下 SQL 查询来确认:如果返回值为1,则表示 InnoDB 支持 XA 事务。

SELECT @@global.innodb_support_xa;

其他数据库配置可以询问 AI,不过现在的流行数据库默认基本都支持。


PS 多线程下使用:(待优化)

不过 XA事务也不是万能药,也有它的局限性。

以下是 AI内容:
1. **MySQL XA事务的局限性**   - **性能开销**     - **额外的日志和协调成本**:XA事务遵循两阶段提交(2PC)协议,在准备阶段和提交阶段都需要进行额外的日志记录和协调工作。这涉及到多个数据库节点(资源管理器)之间的通信和同步,会产生网络开销。例如,在分布式数据库环境中,频繁使用XA事务可能会导致网络带宽占用增加,尤其是当事务涉及多个远程数据库服务器时。     - **资源锁定时间长**:在事务执行期间,尤其是从开始事务到提交或回滚的整个过程中,资源(如数据库中的行锁、表锁)会被长时间锁定。这可能会影响系统的并发性能,导致其他事务等待锁的释放,从而降低系统的整体吞吐量。例如,在一个高并发的电商系统中,如果一个包含多个数据库操作的XA事务长时间锁定了商品库存表的相关行,其他涉及库存更新的事务就需要等待,可能会导致用户体验下降。   - **复杂的错误处理和恢复**     - **部分提交问题**:在2PC过程中,如果在准备阶段所有资源管理器都同意提交(返回`XAResource.XA_OK`),但在提交阶段某个资源管理器出现故障(如网络故障、数据库崩溃),就可能导致部分数据已经提交,部分数据未提交的情况。这种数据不一致的情况很难处理,需要复杂的恢复机制来保证数据的一致性。     - **启发式决策风险**:在出现故障时,资源管理器可能会做出启发式决策(如自动回滚或提交部分事务),这可能与全局事务的预期结果不一致。例如,一个资源管理器在等待提交指令超时后自行回滚了事务,而其他资源管理器已经成功提交,这会导致数据不一致,并且很难进行有效的错误追踪和恢复。   - **对数据库版本和配置的依赖**     - **版本兼容性问题**:不同版本的MySQL对XA事务的支持可能存在差异,一些新的特性或者优化可能在旧版本中不可用,或者在新版本中行为有所改变。这可能会导致在数据库升级或者跨版本环境中出现事务处理的兼容性问题。     - **配置要求严格**:MySQL的XA事务正常运行依赖于正确的数据库配置。例如,需要正确设置`innodb_support_xa`参数为`1`来确保InnoDB存储引擎支持XA事务。而且,数据库的日志设置、锁设置等配置参数也会影响XA事务的性能和可靠性,如果配置不当,可能会引发各种问题。 2. **MySQL XA事务的可靠性分析**   - **理论可靠性基础**     - **原子性保证**:从理论上来说,XA事务通过2PC协议确保了事务的原子性。在正常情况下,所有参与事务的资源管理器要么全部提交事务(当每个资源管理器在准备阶段都返回`XAResource.XA_OK`且提交阶段成功),要么全部回滚事务(当准备阶段有资源管理器返回非`XAResource.XA_OK`或者提交阶段出现故障)。这种机制保证了分布式事务作为一个整体的原子性,避免了数据的部分更新。     - **一致性维护**:如果所有资源管理器都正确遵循XA事务协议,并且没有出现启发式决策等异常情况,XA事务可以维护数据的一致性。例如,在银行转账系统中,从一个账户扣款和在另一个账户收款这两个数据库操作作为一个XA事务,只要事务正常执行,就能保证账户余额的一致性。   - **实际可靠性挑战**     - **网络和硬件故障影响**:尽管XA事务有机制来应对故障,但在实际环境中,网络故障、服务器硬件故障等不可预见的情况可能会干扰事务的正常执行。例如,在跨数据中心的XA事务中,如果网络出现分区故障,可能会导致事务的协调信息丢失或者延迟,从而影响事务的可靠性。     - **软件和配置错误风险**:软件的Bug(如数据库软件自身的Bug或者应用程序中XA事务处理代码的错误)以及不正确的数据库配置都可能降低XA事务的可靠性。例如,应用程序中错误地处理了XA事务的返回结果,或者数据库的事务隔离级别设置与XA事务不兼容,都可能导致数据不一致或者事务失败。 总体而言,MySQL XA事务提供了一种处理分布式事务的方法,但在使用时需要充分考虑其局限性,并且要谨慎地进行错误处理和系统配置,以确保在实际应用中的可靠性。在一些对事务一致性要求极高的场景下,可能还需要结合其他技术(如消息队列、补偿机制等)来进一步增强系统的可靠性。


虽然不是万能药,但问题也不大,,,数据库的稳定性,用的阿里云数据库,到也没出过问题。毕竟嵌套都没问题。。。

作为一个学习参考了~ 看完不要忘记点赞哈!

评论区

JFinal

2024-11-10 10:40

这个功能有不少同学需要,相当有价值,点赞 + 收藏, 以后有同学问,直接上链接

北流家园网

2024-11-11 10:09

杜总出品,必属精品,点赞,收藏

zzutligang

2024-11-12 17:48

如果是同一个线程,引入atomikos或则btm也是可以轻松解决多数据源xa事务的问题。但最麻烦的还是如果在多个线程里,xa事务的问题。

杜福忠

2024-11-12 19:30

@zzutligang 都手撸XA 了,多线程下也就是共享config.setThreadLocalConnection(connection);变量的事情了,没啥麻烦的。看我加两行代码就解决了

杜福忠

2024-11-12 20:19

@zzutligang 加了一个DbThreadLocalRunnable工具

zzutligang

2024-11-13 09:53

@杜福忠 杜总有空能不能研究一下,如果是在不同的jvm进程里呢?有没有解决方案,我是在这个地方卡住了。其实再说清楚一点,我是在这一台机器的jvm进程里通过rpc(tcp协议)调另一台机器的jvm进程下的一个方法。所以,我想必须要有一个事务协调器角色,所有事务都和事务协调器通信,告诉协调器自己是否ok,如果所有都ok,由协调器去控制所有xa事务提交,否则,就控制所有事务回滚。我理解是事务发起者先生成一个id,告诉事务协调器,然后把这个id和自己的xid发给事务协调器,调用rpc的时候,把这个id传递过去,rpc方法拿到这个id和自己的xid也去事务协调器注册自己的事务,协调器就可以把所有同一id的xid组成一个事务,然后每个进程里的xa事务都告诉协调器,自己是ok还是不ok,由协调器统一提交和回滚。到这里我弄不下去了。搁置了有快一年的时间了。

杜福忠

2024-11-13 14:48

@zzutligang 市面没有这类业务的框架吗? 虽然代码可实现,但是性能应该会非常差。可以考虑用其他的方案,比如业务需要的数据整合到一个库,用事物完成第一阶段数据整合,再到各应用下取各自需要的数据。或者独立出来应用,把需要的数据源整合在一起,在一个 jvm 里面使用,再用rpc或 http或 sql 取结果数据也是可拆分的。如果非得实现,我想我会用Redis做一个事物组,把需要的xa 数据源都标记在哪个位置,然后用Redis锁,让大家都等业务执行完了, 再用发布信息让大家都提交或回滚处理。

lyh061619

2024-11-15 08:54

@JFinal 是否考虑融合到jf底层,多数据源这个也有可能是刚需,看也是直接使用jdbc XAConnection

zzutligang

2024-11-17 14:32

@杜福忠 市面上唯一能上生产的就是seata,但它好像只支持dubbo的rpc框架。我的rpc是基于thrift自己实现的。

杜福忠

2024-11-17 15:50

@zzutligang 加几行代码能实现的功能就自己加了,但是要加好多类才能实现的功能。。。那用现成框架更方便了。如果不想搞太麻烦,我就建议搞一个 jvm应用,接收一组 jdbc 和一组 sql 供执行,动态的,用完就关闭完事儿,公共的大家一起用性能还高

热门分享

扫码入社