上次ShardingSphere的分布式事务经过测试
所有数据源必须是相同的数据库类型;否则抛出异常:Database type inconsistent with '%s' and '%s';其数据库类型根据connection.getMetaData().getDatabaseProductName()
查看了seata框架发现要搭建fescar-server,我只想在单独项目里面实现多数据库事务所以未做深入了解。
了解到相关实现jta框架可以实现我的需求,就进行了集成到jfinal的相关测试。
选用jta的atomike实现
1)在本机不同数据库上建立数据库(mysql,sqlserver,oracle)
CREATE TABLE test (
id int(11) NOT NULL,
PRIMARY KEY (id)
)
2)加入atomikos的jar
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
3)定义插件AtomikosPlugin类(更多数据库设置参数未实现)
package com.csnt.source.plugin.atomikos;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.jfinal.kit.LogKit;
import com.jfinal.kit.StrKit;
import com.jfinal.plugin.IPlugin;
import com.jfinal.plugin.activerecord.ActiveRecordException;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.IDataSourceProvider;
import javax.sql.DataSource;
import java.util.Properties;
/**
* @author source
*/
public class AtomikosPlugin implements IPlugin, IDataSourceProvider {
private String name = null;
private String url;
private String username;
private String password;
private String xaDataSourceStr;
private int minPoolSize = 10;
private int maxPoolSize = 32;
private AtomikosDataSourceBean ds;
private boolean localTransactionMode = true;
private volatile boolean isStarted = false;
public AtomikosPlugin(String url, String username, String password, String xaDataSourceStr) {
this.url = url;
this.username = username;
this.password = password;
this.xaDataSourceStr = xaDataSourceStr;
}
public AtomikosPlugin set(int minPoolSize, int maxPoolSize) {
this.minPoolSize = minPoolSize;
this.maxPoolSize = maxPoolSize;
return this;
}
public final AtomikosPlugin setLocalTransactionMode(boolean localTransactionMode) {
this.localTransactionMode = localTransactionMode;
return this;
}
public final String getName() {
return this.name;
}
public final void setName(String name) {
this.name = name;
}
public static boolean tx(IAtom atom) {
UserTransactionImp utx = new UserTransactionImp();
try {
utx.begin();
boolean result = atom.run();
if (result) {
// 提交事务
utx.commit();
} else {
//回滚事务
utx.rollback();
}
return result;
} catch (Throwable t) {
try {
utx.rollback();
} catch (Exception e) {
LogKit.error(e.getMessage(), e);
}
throw t instanceof RuntimeException ? (RuntimeException) t : new ActiveRecordException(t);
}
}
@Override
public boolean start() {
if (isStarted) {
return true;
} else {
ds = new AtomikosDataSourceBean();
if (StrKit.notBlank(name)) {
ds.setUniqueResourceName(name);
}
ds.setMinPoolSize(minPoolSize);
ds.setMaxPoolSize(maxPoolSize);
//是否开启本地事务与jta事务混合
ds.setLocalTransactionMode(localTransactionMode);
ds.setXaDataSourceClassName(xaDataSourceStr);
Properties properties = new Properties();
properties.put("URL", url);
properties.put("user", username);
properties.put("password", password);
ds.setXaProperties(properties);
this.isStarted = true;
return true;
}
}
@Override
public boolean stop() {
if (ds != null) {
ds.close();
}
ds = null;
isStarted = false;
return true;
}
@Override
public DataSource getDataSource() {
return ds;
}
}
4)设置数据库配置信息
/**
* 配置数据库 Atomikos
*/
private void configDbForAtomikos(Plugins me) {
loadPropertyFile("database.properties");
String dbConfig = getProperty("database.register", "main");
String[] dataBases = dbConfig.split(",");
for (String dbKey : dataBases) {
AtomikosPlugin ap = new AtomikosPlugin(getProperty(dbKey + ".jdbcUrl"), getProperty(dbKey + ".user"), getProperty(dbKey + ".password"), getProperty(dbKey + ".xa.datasource"));
ap.setName(dbKey);
// //设置连接池数量
// if (StringUtil.isNotEmpty(getPropertyToInt(dbKey + ".initialSize"), getPropertyToInt(dbKey + ".minIdle"), getPropertyToInt(dbKey + ".maxActive"))) {
// ap.set(getPropertyToInt(dbKey + ".initialSize"), getPropertyToInt(dbKey + ".minIdle"), getPropertyToInt(dbKey + ".maxActive"));
// }
ActiveRecordPlugin arp;
if ("main".equals(dbKey)) {
arp = new ActiveRecordPlugin(ap);
} else {
arp = new ActiveRecordPlugin(dbKey, ap);
}
arp.setDialect(getDbDialectByDriver(getProperty(dbKey + ".driver")));
arp.setContainerFactory(new CaseInsensitiveContainerFactory());
arp.setShowSql(true);
me.add(ap);
me.add(arp);
}
}
#数据库配置注册器,配置到此处的数据库,自动注册到程序
database.register=main,test1,test2
#region main数据库
main.driver=com.mysql.cj.jdbc.Driver
main.xa.datasource=com.mysql.cj.jdbc.MysqlXADataSource
main.jdbcUrl=jdbc:mysql://127.0.0.1:3306/source?serverTimezone=Asia/Shanghai&characterEncoding=utf8
main.user=
main.password=
#endregion
test1.driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
test1.xa.datasource=com.microsoft.sqlserver.jdbc.SQLServerXADataSource
test1.jdbcUrl=jdbc:sqlserver://127.0.0.1:1433;databaseName=test
test1.user=
test1.password=
test2.driver=oracle.jdbc.OracleDriver
test2.xa.datasource=oracle.jdbc.xa.client.OracleXADataSource
test2.jdbcUrl=jdbc:oracle:thin:@127.0.0.1:1521:XE
test2.user=
test2.password=
5)测试代码
Record record = new Record();
record.set("id", 1);
AtomikosPlugin.tx(() -> {
Db.save("test", record);
Db.use("test1").save("test", record);
Db.use("test2").save("test", record);
return false;
});
6)待完善问题
Atomike更多参数调优设置,性能测试
7)实际运行中问题处理
Oracle XA事务授权
Oracle 授权XA事务脚本
GRANT SELECT ON
sys.dba_pending_transactions TO name;
GRANT SELECT ON
sys.pending_trans$ TO name;
GRANT SELECT ON
sys.dba_2pc_pending TO name;
GRANT execute ON sys.dbms_system TO name;
sqlserver配置XA事务 (https://www.ibm.com/support/knowledgecenter/zh/SSTLXK_8.5.5/com.ibm.wbpm.imuc.ebpm.doc/topics/db_xa_nd_win_man.html)
减少Atomikos日志
在log4j.properties追加配置
#override atomikos info msg
log4j.logger.com.atomikos=WARN
修改Atomikos日志路径
jta.properties修改 com.atomikos.icatch.log_base_dir=./logs/transprovincially
修改Atomikos的localTransactionMode属性为true
不修改单独使用本地事务会报错
PS.本人是新手如果有错误敬请谅解,本项目只是测试项目。
参考资料:
jfinal数据库事务相关源码
记一次Atomikos分布式事务的使用 https://www.jianshu.com/p/86b4ab4f2d18
Db.tx(() -> {
return Db.use("test1").tx(() -> {
return Db.use("test2").tx(() -> {
Db.save("test", record);
Db.use("test1").save("test", record);
Db.use("test2").save("test", record);
return false;
});
});
});