Jfinal引入Atomikos的后续完善(1)

上文jfinal集成分布式数据库事务框架后,进行测试时出现的各种问题解决及代码完善。


l  简化版SchedulerPluginScheduledThreadPlugin

l  使用winsw注册Windows服务,可实现服务正常停止

l  集成Atomikos实现分布式数据库事务,建立DbAtomikosKit支持数据库批量操作


1)插件AtomikosPlugin完善

package com.csnt.source.plugin.atomikos;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.atomikos.jdbc.AtomikosNonXADataSourceBean;
import com.atomikos.jdbc.internal.AbstractDataSourceBean;
import com.jfinal.kit.LogKit;
import com.jfinal.kit.StrKit;
import com.jfinal.plugin.IPlugin;
import com.jfinal.plugin.activerecord.ActiveRecordException;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.IDataSourceProvider;

import javax.sql.DataSource;
import java.util.Properties;


/**
 * @author source
 */
public class AtomikosPlugin implements IPlugin, IDataSourceProvider {
    private String name = null;
    private String url;
    private String username;
    private String password;
    private String xaDataSourceStr;
    private String driverClassName;

    private int minPoolSize = 5;
    private int maxPoolSize = 32;

    private AbstractDataSourceBean ds;
    private boolean localTransactionMode = true;
    private volatile boolean isStarted = false;

    public AtomikosPlugin(String url, String username, String password, String xaDataSourceStr, String driverClassName) {
        this.url = url;
        this.username = username;
        this.password = password;
        this.xaDataSourceStr = xaDataSourceStr;
        this.driverClassName = driverClassName;
    }

    public AtomikosPlugin set(int minPoolSize, int maxPoolSize) {
        this.minPoolSize = minPoolSize;
        this.maxPoolSize = maxPoolSize;
        return this;
    }

    public final AtomikosPlugin setLocalTransactionMode(boolean localTransactionMode) {
        this.localTransactionMode = localTransactionMode;
        return this;
    }

    public final String getName() {
        return this.name;
    }

    public final void setName(String name) {
        this.name = name;
    }

    public static boolean tx(IAtom atom) {
        UserTransactionImp utx = new UserTransactionImp();
        try {
            utx.begin();
            boolean result = atom.run();
            if (result) {
                // 提交事务
                utx.commit();
            } else {
                //回滚事务
                utx.rollback();
            }
            return result;
        } catch (Throwable t) {
            try {
                utx.rollback();
            } catch (Exception e) {
                LogKit.error(e.getMessage(), e);
            }
            throw t instanceof RuntimeException ? (RuntimeException) t : new ActiveRecordException(t);
        }
    }

    @Override
    public boolean start() {
        if (isStarted) {
            return true;
        } else {
            if (StrKit.notBlank(xaDataSourceStr)) {
                AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
                atomikosDataSourceBean.setXaDataSourceClassName(xaDataSourceStr);
                atomikosDataSourceBean.setMinPoolSize(minPoolSize);
                atomikosDataSourceBean.setMaxPoolSize(maxPoolSize);
                //是否开启本地事务与jta事务混合
                atomikosDataSourceBean.setLocalTransactionMode(localTransactionMode);
                Properties properties = new Properties();
                properties.put("URL", url);
                properties.put("user", username);
                properties.put("password", password);
                atomikosDataSourceBean.setXaProperties(properties);
                ds = atomikosDataSourceBean;
            } else {
                AtomikosNonXADataSourceBean atomikosNonXADataSourceBean = new AtomikosNonXADataSourceBean();

                atomikosNonXADataSourceBean.setLocalTransactionMode(localTransactionMode);
                atomikosNonXADataSourceBean.setUrl(url);
                atomikosNonXADataSourceBean.setUser(username);
                atomikosNonXADataSourceBean.setPassword(password);
                atomikosNonXADataSourceBean.setDriverClassName(driverClassName);
                ds = atomikosNonXADataSourceBean;
            }
            if (StrKit.notBlank(name)) {
                ds.setUniqueResourceName(name);
            }
            this.isStarted = true;
            return true;
        }
    }

    @Override
    public boolean stop() {
        if (ds != null) {
            ds.close();
        }

        ds = null;
        isStarted = false;
        return true;
    }

    @Override
    public DataSource getDataSource() {
        return ds;
    }

}

2)由于事务交托给Atomikos进行管理,jfinal批量操作报错

改写批量保存方法

package com.csnt.source.utils;

import com.jfinal.kit.StrKit;
import com.jfinal.plugin.activerecord.*;
import com.jfinal.plugin.activerecord.dialect.Dialect;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;

@SuppressWarnings("Duplicates")
public class DbAtomikosKit {
    private DbAtomikosKit() {
    }

    private static int[] batch(Connection conn, String sql, Object[][] paras) throws SQLException {
        if (paras == null || paras.length == 0) {
            return new int[0];
        }

        int pointer = 0;
        int[] result = new int[paras.length];
        PreparedStatement pst = conn.prepareStatement(sql);
        for (int i = 0; i < paras.length; i++) {
            for (int j = 0; j < paras[i].length; j++) {
                Object value = paras[i][j];
                if (value instanceof java.util.Date) {
                    if (value instanceof java.sql.Date) {
                        pst.setDate(j + 1, (java.sql.Date) value);
                    } else if (value instanceof java.sql.Timestamp) {
                        pst.setTimestamp(j + 1, (java.sql.Timestamp) value);
                    } else {
                        // Oracle、SqlServer 中的 TIMESTAMP、DATE 支持 new Date() 给值
                        java.util.Date d = (java.util.Date) value;
                        pst.setTimestamp(j + 1, new java.sql.Timestamp(d.getTime()));
                    }
                } else {
                    pst.setObject(j + 1, value);
                }
            }
            pst.addBatch();

        }
        int[] r = pst.executeBatch();
        for (int k = 0; k < r.length; k++) {
            result[pointer++] = r[k];
        }
        close(pst);
        return result;
    }

    /**
     * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
     * <pre>
     * Example:
     * String sql = "insert into user(name, cash) values(?, ?)";
     * int[] result = Db.use().batch(sql, new Object[][]{{"James", 888}, {"zhanjin", 888}});
     * </pre>
     *
     * @param sql   The SQL to execute.
     * @param paras An array of query replacement parameters.  Each row in this array is one set of batch replacement values.
     * @return The number of rows updated per statement
     */
    public static int[] batch(DbPro dbPro, String sql, Object[][] paras) {
        Connection conn = null;
        Config config = dbPro.getConfig();
        try {
            conn = config.getConnection();
            return batch(conn, sql, paras);
        } catch (Exception e) {
            throw new ActiveRecordException(e);
        } finally {
            config.close(conn);
        }
    }

    private static int[] batch(Connection conn, String sql, String columns, List list) throws SQLException {
        if (list == null || list.size() == 0) {
            return new int[0];
        }
        Object element = list.get(0);
        if (!(element instanceof Record) && !(element instanceof Model)) {
            throw new IllegalArgumentException("The element in list must be Model or Record.");
        }
        boolean isModel = element instanceof Model;

        String[] columnArray = columns.split(",");
        for (int i = 0; i < columnArray.length; i++) {
            columnArray[i] = columnArray[i].trim();
        }

        int pointer = 0;
        int size = list.size();
        int[] result = new int[size];
        PreparedStatement pst = conn.prepareStatement(sql);
        for (int i = 0; i < size; i++) {
            Map map = isModel ? getAttrs((Model) list.get(i)) : ((Record) list.get(i)).getColumns();
            for (int j = 0; j < columnArray.length; j++) {
                Object value = map.get(columnArray[j]);
                if (value instanceof java.util.Date) {
                    if (value instanceof java.sql.Date) {
                        pst.setDate(j + 1, (java.sql.Date) value);
                    } else if (value instanceof java.sql.Timestamp) {
                        pst.setTimestamp(j + 1, (java.sql.Timestamp) value);
                    } else {
                        // Oracle、SqlServer 中的 TIMESTAMP、DATE 支持 new Date() 给值
                        java.util.Date d = (java.util.Date) value;
                        pst.setTimestamp(j + 1, new java.sql.Timestamp(d.getTime()));
                    }
                } else {
                    pst.setObject(j + 1, value);
                }
            }
            pst.addBatch();
        }
        int[] r = pst.executeBatch();
        for (int k = 0; k < r.length; k++) {
            result[pointer++] = r[k];
        }
        close(pst);
        return result;
    }

    /**
     * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
     * <pre>
     * Example:
     * String sql = "insert into user(name, cash) values(?, ?)";
     * int[] result = Db.use().batch(sql, "name, cash", modelList, 500);
     * </pre>
     *
     * @param sql               The SQL to execute.
     * @param columns           the columns need be processed by sql.
     * @param modelOrRecordList model or record object list.
     * @return The number of rows updated per statement
     */
    public static int[] batch(DbPro dbPro, String sql, String columns, List modelOrRecordList) {
        Connection conn = null;
        Config config = dbPro.getConfig();
        try {
            conn = config.getConnection();
            return batch(conn, sql, columns, modelOrRecordList);
        } catch (Exception e) {
            throw new ActiveRecordException(e);
        } finally {
            config.close(conn);
        }
    }

    private static int[] batch(Connection conn, List<String> sqlList) throws SQLException {
        if (sqlList == null || sqlList.size() == 0) {
            return new int[0];
        }

        int pointer = 0;
        int size = sqlList.size();
        int[] result = new int[size];
        Statement st = conn.createStatement();
        for (int i = 0; i < size; i++) {
            st.addBatch(sqlList.get(i));

        }
        int[] r = st.executeBatch();
        for (int k = 0; k < r.length; k++) {
            result[pointer++] = r[k];
        }
        close(st);
        return result;
    }

    /**
     * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
     * <pre>
     * Example:
     * int[] result = Db.use().batch(sqlList, 500);
     * </pre>
     *
     * @param sqlList The SQL list to execute.
     * @return The number of rows updated per statement
     */
    public static int[] batch(DbPro dbPro, List<String> sqlList) {
        Connection conn = null;
        Config config = dbPro.getConfig();
        try {
            conn = config.getConnection();
            return batch(conn, sqlList);
        } catch (Exception e) {
            throw new ActiveRecordException(e);
        } finally {
            config.close(conn);
        }
    }

    /**
     * Batch save models using the "insert into ..." sql generated by the first model in modelList.
     * Ensure all the models can use the same sql as the first model.
     */
    public static int[] batchSave(DbPro dbPro, List<? extends Model> modelList) {
        if (modelList == null || modelList.size() == 0) {
            return new int[0];
        }

        Dialect dialect = dbPro.getConfig().getDialect();

        Model model = modelList.get(0);
        Map<String, Object> attrs = getAttrs(model);
        int index = 0;
        StringBuilder columns = new StringBuilder();
        // the same as the iterator in Dialect.forModelSave() to ensure the order of the attrs
        for (Map.Entry<String, Object> e : attrs.entrySet()) {
            if (dialect.isOracle()) {    // 支持 oracle 自增主键
                Object value = e.getValue();
                if (value instanceof String && ((String) value).endsWith(".nextval")) {
                    continue;
                }
            }

            if (index++ > 0) {
                columns.append(',');
            }
            columns.append(e.getKey());
        }

        StringBuilder sql = new StringBuilder();
        List<Object> parasNoUse = new ArrayList<Object>();
        dialect.forModelSave(TableMapping.me().getTable(model.getClass()), attrs, sql, parasNoUse);
        return batch(dbPro, sql.toString(), columns.toString(), modelList);
    }

    /**
     * Batch save records using the "insert into ..." sql generated by the first record in recordList.
     * Ensure all the record can use the same sql as the first record.
     *
     * @param tableName the table name
     */
    public static int[] batchSave(DbPro dbPro, String tableName, List<Record> recordList) {
        if (recordList == null || recordList.size() == 0) {
            return new int[0];
        }
        Dialect dialect = dbPro.getConfig().getDialect();

        Record record = recordList.get(0);
        Map<String, Object> cols = record.getColumns();
        int index = 0;
        StringBuilder columns = new StringBuilder();
        // the same as the iterator in Dialect.forDbSave() to ensure the order of the columns
        for (Map.Entry<String, Object> e : cols.entrySet()) {
            if (dialect.isOracle()) {    // 支持 oracle 自增主键
                Object value = e.getValue();
                if (value instanceof String && ((String) value).endsWith(".nextval")) {
                    continue;
                }
            }

            if (index++ > 0) {
                columns.append(',');
            }
            columns.append(e.getKey());
        }

        String[] pKeysNoUse = new String[0];
        StringBuilder sql = new StringBuilder();
        List<Object> parasNoUse = new ArrayList<Object>();
        dialect.forDbSave(tableName, pKeysNoUse, record, sql, parasNoUse);
        return batch(dbPro, sql.toString(), columns.toString(), recordList);
    }

    /**
     * Batch update models using the attrs names of the first model in modelList.
     * Ensure all the models can use the same sql as the first model.
     */
    public static int[] batchUpdate(DbPro dbPro, List<? extends Model> modelList) {
        if (modelList == null || modelList.size() == 0) {
            return new int[0];
        }
        Dialect dialect = dbPro.getConfig().getDialect();

        Model model = modelList.get(0);
        Table table = TableMapping.me().getTable(model.getClass());
        String[] pKeys = table.getPrimaryKey();

        Map<String, Object> attrs = getAttrs(model);
        List<String> attrNames = new ArrayList<String>();
        // the same as the iterator in Dialect.forModelSave() to ensure the order of the attrs
        for (Map.Entry<String, Object> e : attrs.entrySet()) {
            String attr = e.getKey();
            if (!dialect.isPrimaryKey(attr, pKeys) && table.hasColumnLabel(attr)) {
                attrNames.add(attr);
            }
        }
        attrNames.addAll(Arrays.asList(pKeys));
        String columns = StrKit.join(attrNames.toArray(new String[0]), ",");

        // update all attrs of the model not use the midifyFlag of every single model
        Set<String> modifyFlag = attrs.keySet();    // model.getModifyFlag();

        StringBuilder sql = new StringBuilder();
        List<Object> parasNoUse = new ArrayList<Object>();
        dialect.forModelUpdate(TableMapping.me().getTable(model.getClass()), attrs, modifyFlag, sql, parasNoUse);
        return batch(dbPro, sql.toString(), columns, modelList);
    }

    /**
     * Batch update records using the columns names of the first record in recordList.
     * Ensure all the records can use the same sql as the first record.
     *
     * @param tableName  the table name
     * @param primaryKey the primary key of the table, composite primary key is separated by comma character: ","
     */
    public static int[] batchUpdate(DbPro dbPro, String tableName, String primaryKey, List<Record> recordList) {
        if (recordList == null || recordList.size() == 0) {
            return new int[0];
        }

        String[] pKeys = primaryKey.split(",");
        Dialect dialect = dbPro.getConfig().getDialect();
        dialect.trimPrimaryKeys(pKeys);

        Record record = recordList.get(0);
        Map<String, Object> cols = record.getColumns();
        List<String> colNames = new ArrayList<String>();
        // the same as the iterator in Dialect.forDbUpdate() to ensure the order of the columns
        for (Map.Entry<String, Object> e : cols.entrySet()) {
            String col = e.getKey();
            if (!dialect.isPrimaryKey(col, pKeys)) {
                colNames.add(col);
            }
        }
        colNames.addAll(Arrays.asList(pKeys));
        String columns = StrKit.join(colNames.toArray(new String[0]), ",");

        Object[] idsNoUse = new Object[pKeys.length];
        StringBuilder sql = new StringBuilder();
        List<Object> parasNoUse = new ArrayList<Object>();
        dialect.forDbUpdate(tableName, pKeys, idsNoUse, record, sql, parasNoUse);
        return batch(dbPro, sql.toString(), columns, recordList);
    }

    /**
     * Batch update records with default primary key, using the columns names of the first record in recordList.
     * Ensure all the records can use the same sql as the first record.
     *
     * @param tableName the table name
     */
    public static int[] batchUpdate(DbPro dbPro, String tableName, List<Record> recordList) {
        return batchUpdate(dbPro, tableName, dbPro.getConfig().getDialect().getDefaultPrimaryKey(), recordList);
    }

    private static void close(Statement st) {
        if (st != null) {
            try {
                st.close();
            } catch (SQLException e) {
                throw new ActiveRecordException(e);
            }
        }
    }

    private static Map<String, Object> getAttrs(Model model) {
        Set<Map.Entry<String, Object>> set = model._getAttrsEntrySet();
        Map<String, Object> mapFromSet = new HashMap<String, Object>(16);
        for (Map.Entry<String, Object> entry : set) {
            mapFromSet.put(entry.getKey(), entry.getValue());
        }
        return mapFromSet;
    }
}

测试

List<Record> list=new ArrayList<>(2);
list.add(new Record().set("id", 3));
list.add(new Record().set("id", 4));

AtomikosPlugin.tx(() -> {
    DbAtomikosKit.batchSave(Db.use(),"test", list);
    DbAtomikosKit.batchSave(Db.use("test1"),"test", list);
    DbAtomikosKit.batchSave(Db.use("test2"),"test", list);
    return false;
});

评论区

北流家园网

2019-06-19 11:14

没看得出优势在哪里

sourceTT

2019-06-19 20:48

@北流家园网 保证多个数据库事务的统一提交及回滚

热门分享

扫码入社