From e2e176d5a8b17c6e3a70bcb66ffc86dca6b30a19 Mon Sep 17 00:00:00 2001 From: hzh <57568417+zhaohaoh@users.noreply.github.com> Date: Mon, 26 Dec 2022 13:54:30 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=A4=9A=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=BA=90=E4=BA=8B=E5=8A=A1=E4=BC=A0=E6=92=AD=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=20(#406)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 本地多数据源事务传播机制 --- .../annotation/DSTransactional.java | 9 ++ .../DynamicLocalTransactionInterceptor.java | 46 +++--- .../ds/AbstractRoutingDataSource.java | 12 +- .../exception/TransactionException.java | 11 ++ .../DynamicDataSourceAutoConfiguration.java | 1 - .../DynamicDataSourceProperties.java | 2 +- .../datasource/tx/ConnectionFactory.java | 49 ++++-- .../datasource/tx/ConnectionProxy.java | 1 - .../dynamic/datasource/tx/DsPropagation.java | 18 +++ .../dynamic/datasource/tx/LocalTxUtil.java | 18 ++- .../tx/SuspendedResourcesHolder.java | 22 +++ .../datasource/tx/TransactionalExecutor.java | 10 ++ .../datasource/tx/TransactionalInfo.java | 34 ++++ .../datasource/tx/TransactionalTemplate.java | 149 ++++++++++++++++++ 14 files changed, 328 insertions(+), 54 deletions(-) create mode 100644 src/main/java/com/baomidou/dynamic/datasource/exception/TransactionException.java create mode 100644 src/main/java/com/baomidou/dynamic/datasource/tx/DsPropagation.java create mode 100644 src/main/java/com/baomidou/dynamic/datasource/tx/SuspendedResourcesHolder.java create mode 100644 src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalExecutor.java create mode 100644 src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalInfo.java create mode 100644 src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalTemplate.java diff --git a/src/main/java/com/baomidou/dynamic/datasource/annotation/DSTransactional.java b/src/main/java/com/baomidou/dynamic/datasource/annotation/DSTransactional.java index 5d2fec6..ce9ae5b 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/annotation/DSTransactional.java +++ b/src/main/java/com/baomidou/dynamic/datasource/annotation/DSTransactional.java @@ -15,6 +15,10 @@ */ package com.baomidou.dynamic.datasource.annotation; + + +import com.baomidou.dynamic.datasource.tx.DsPropagation; + import java.lang.annotation.*; /** @@ -26,4 +30,9 @@ import java.lang.annotation.*; @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DSTransactional { + Class[] rollbackFor() default {Exception.class}; + + Class[] noRollbackFor() default {}; + + DsPropagation propagation() default DsPropagation.REQUIRED; } diff --git a/src/main/java/com/baomidou/dynamic/datasource/aop/DynamicLocalTransactionInterceptor.java b/src/main/java/com/baomidou/dynamic/datasource/aop/DynamicLocalTransactionInterceptor.java index e4072ef..a920875 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/aop/DynamicLocalTransactionInterceptor.java +++ b/src/main/java/com/baomidou/dynamic/datasource/aop/DynamicLocalTransactionInterceptor.java @@ -15,39 +15,41 @@ */ package com.baomidou.dynamic.datasource.aop; -import com.baomidou.dynamic.datasource.tx.LocalTxUtil; -import com.baomidou.dynamic.datasource.tx.TransactionContext; +import com.baomidou.dynamic.datasource.annotation.DSTransactional; +import com.baomidou.dynamic.datasource.tx.*; import lombok.extern.slf4j.Slf4j; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; -import org.springframework.util.StringUtils; +import java.lang.reflect.Method; /** * @author funkye */ @Slf4j public class DynamicLocalTransactionInterceptor implements MethodInterceptor { + private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate(); @Override - public Object invoke(MethodInvocation methodInvocation) throws Throwable { - if (!StringUtils.isEmpty(TransactionContext.getXID())) { - return methodInvocation.proceed(); - } - boolean state = true; - Object o; - LocalTxUtil.startTransaction(); - try { - o = methodInvocation.proceed(); - } catch (Exception e) { - state = false; - throw e; - } finally { - if (state) { - LocalTxUtil.commit(); - } else { - LocalTxUtil.rollback(); + public Object invoke(final MethodInvocation methodInvocation) throws Throwable { + Method method = methodInvocation.getMethod(); + final DSTransactional dsTransactional = method.getAnnotation(DSTransactional.class); + + TransactionalExecutor transactionalExecutor = new TransactionalExecutor() { + @Override + public Object execute() throws Throwable { + return methodInvocation.proceed(); } - } - return o; + + @Override + public TransactionalInfo getTransactionInfo() { + TransactionalInfo transactionInfo = new TransactionalInfo(); + transactionInfo.setPropagation(dsTransactional.propagation()); + transactionInfo.setNoRollbackFor(dsTransactional.noRollbackFor()); + transactionInfo.setRollbackFor(dsTransactional.rollbackFor()); + return transactionInfo; + } + }; + return transactionalTemplate.execute(transactionalExecutor); } + } diff --git a/src/main/java/com/baomidou/dynamic/datasource/ds/AbstractRoutingDataSource.java b/src/main/java/com/baomidou/dynamic/datasource/ds/AbstractRoutingDataSource.java index 2c80282..f04a64d 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/ds/AbstractRoutingDataSource.java +++ b/src/main/java/com/baomidou/dynamic/datasource/ds/AbstractRoutingDataSource.java @@ -56,8 +56,8 @@ public abstract class AbstractRoutingDataSource extends AbstractDataSource { } else { String ds = DynamicDataSourceContextHolder.peek(); ds = StringUtils.isEmpty(ds) ? getPrimary() : ds; - ConnectionProxy connection = ConnectionFactory.getConnection(ds); - return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection; + ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds); + return connection == null ? getConnectionProxy(xid, ds, determineDataSource().getConnection()) : connection; } } @@ -69,15 +69,15 @@ public abstract class AbstractRoutingDataSource extends AbstractDataSource { } else { String ds = DynamicDataSourceContextHolder.peek(); ds = StringUtils.isEmpty(ds) ? getPrimary() : ds; - ConnectionProxy connection = ConnectionFactory.getConnection(ds); - return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password)) + ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds); + return connection == null ? getConnectionProxy(xid, ds, determineDataSource().getConnection(username, password)) : connection; } } - private Connection getConnectionProxy(String ds, Connection connection) { + private Connection getConnectionProxy(String xid, String ds, Connection connection) { ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds); - ConnectionFactory.putConnection(ds, connectionProxy); + ConnectionFactory.putConnection(xid, ds, connectionProxy); return connectionProxy; } diff --git a/src/main/java/com/baomidou/dynamic/datasource/exception/TransactionException.java b/src/main/java/com/baomidou/dynamic/datasource/exception/TransactionException.java new file mode 100644 index 0000000..b7fe87e --- /dev/null +++ b/src/main/java/com/baomidou/dynamic/datasource/exception/TransactionException.java @@ -0,0 +1,11 @@ +package com.baomidou.dynamic.datasource.exception; + +public class TransactionException extends RuntimeException { + public TransactionException(String message) { + super(message); + } + + public TransactionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceAutoConfiguration.java b/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceAutoConfiguration.java index 5a1f68c..459ab4d 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceAutoConfiguration.java +++ b/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceAutoConfiguration.java @@ -14,7 +14,6 @@ * limitations under the License. */ package com.baomidou.dynamic.datasource.spring.boot.autoconfigure; - import com.baomidou.dynamic.datasource.DynamicRoutingDataSource; import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DSTransactional; diff --git a/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceProperties.java b/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceProperties.java index a318571..d020ff3 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceProperties.java +++ b/src/main/java/com/baomidou/dynamic/datasource/spring/boot/autoconfigure/DynamicDataSourceProperties.java @@ -110,4 +110,4 @@ public class DynamicDataSourceProperties { */ @NestedConfigurationProperty private DynamicDatasourceAopProperties aop = new DynamicDatasourceAopProperties(); -} +} \ No newline at end of file diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionFactory.java b/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionFactory.java index 2f20f3d..52604ff 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionFactory.java +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionFactory.java @@ -15,6 +15,8 @@ */ package com.baomidou.dynamic.datasource.tx; +import org.springframework.util.CollectionUtils; + import java.sql.SQLException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -24,43 +26,60 @@ import java.util.concurrent.ConcurrentHashMap; */ public class ConnectionFactory { - private static final ThreadLocal> CONNECTION_HOLDER = - new ThreadLocal>() { + private static final ThreadLocal>> CONNECTION_HOLDER = + new ThreadLocal>>() { @Override - protected Map initialValue() { - return new ConcurrentHashMap<>(8); + protected Map> initialValue() { + return new ConcurrentHashMap<>(); } }; - public static void putConnection(String ds, ConnectionProxy connection) { - Map concurrentHashMap = CONNECTION_HOLDER.get(); - if (!concurrentHashMap.containsKey(ds)) { + public static void putConnection(String xid, String ds, ConnectionProxy connection) { + Map> concurrentHashMap = CONNECTION_HOLDER.get(); + Map connectionProxyMap = concurrentHashMap.get(xid); + if (connectionProxyMap == null) { + connectionProxyMap = new ConcurrentHashMap<>(); + concurrentHashMap.put(xid, connectionProxyMap); + } + if (!connectionProxyMap.containsKey(ds)) { try { connection.setAutoCommit(false); } catch (SQLException e) { e.printStackTrace(); } - concurrentHashMap.put(ds, connection); + connectionProxyMap.put(ds, connection); } } - public static ConnectionProxy getConnection(String ds) { - return CONNECTION_HOLDER.get().get(ds); + public static ConnectionProxy getConnection(String xid, String ds) { + Map> concurrentHashMap = CONNECTION_HOLDER.get(); + Map connectionProxyMap = concurrentHashMap.get(xid); + if (CollectionUtils.isEmpty(connectionProxyMap)) { + return null; + } + return connectionProxyMap.get(ds); } - public static void notify(Boolean state) throws Exception { + public static void notify(String xid, Boolean state) throws Exception { Exception exception = null; + Map> concurrentHashMap = CONNECTION_HOLDER.get(); try { - Map concurrentHashMap = CONNECTION_HOLDER.get(); - for (ConnectionProxy connectionProxy : concurrentHashMap.values()) { + if (CollectionUtils.isEmpty(concurrentHashMap)) { + return; + } + Map connectionProxyMap = concurrentHashMap.get(xid); + for (ConnectionProxy connectionProxy : connectionProxyMap.values()) { try { - connectionProxy.notify(state); + if (connectionProxy != null) { + connectionProxy.notify(state); + } } catch (SQLException e) { exception = e; } + } } finally { - CONNECTION_HOLDER.remove(); + concurrentHashMap.remove(xid); if (exception != null) { throw exception; } diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionProxy.java b/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionProxy.java index 3277d48..0d79c77 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionProxy.java +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/ConnectionProxy.java @@ -16,7 +16,6 @@ package com.baomidou.dynamic.datasource.tx; import lombok.extern.slf4j.Slf4j; - import java.sql.*; import java.util.Map; import java.util.Properties; diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/DsPropagation.java b/src/main/java/com/baomidou/dynamic/datasource/tx/DsPropagation.java new file mode 100644 index 0000000..a64cdb2 --- /dev/null +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/DsPropagation.java @@ -0,0 +1,18 @@ + +package com.baomidou.dynamic.datasource.tx; + + +public enum DsPropagation { + //支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。 + REQUIRED, + //新建事务,如果当前存在事务,把当前事务挂起。 + REQUIRES_NEW, + //以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。 + NOT_SUPPORTED, + //支持当前事务,如果当前没有事务,就以非事务方式执行。 + SUPPORTS, + //以非事务方式执行,如果当前存在事务,则抛出异常。 + NEVER, + //支持当前事务,如果当前没有事务,就抛出异常。 + MANDATORY +} diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/LocalTxUtil.java b/src/main/java/com/baomidou/dynamic/datasource/tx/LocalTxUtil.java index 3cbc6c5..671f446 100644 --- a/src/main/java/com/baomidou/dynamic/datasource/tx/LocalTxUtil.java +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/LocalTxUtil.java @@ -32,22 +32,24 @@ public final class LocalTxUtil { /** * 手动开启事务 */ - public static void startTransaction() { - if (!StringUtils.isEmpty(TransactionContext.getXID())) { - log.debug("dynamic-datasource exist local tx [{}]", TransactionContext.getXID()); + public static String startTransaction() { + String xid = TransactionContext.getXID(); + if (!StringUtils.isEmpty(xid)) { + log.debug("dynamic-datasource exist local tx [{}]", xid); } else { - String xid = UUID.randomUUID().toString(); + xid = UUID.randomUUID().toString(); TransactionContext.bind(xid); log.debug("dynamic-datasource start local tx [{}]", xid); } + return xid; } /** * 手动提交事务 */ - public static void commit() throws Exception { + public static void commit(String xid) throws Exception { try { - ConnectionFactory.notify(true); + ConnectionFactory.notify(xid, true); } finally { log.debug("dynamic-datasource commit local tx [{}]", TransactionContext.getXID()); TransactionContext.remove(); @@ -57,9 +59,9 @@ public final class LocalTxUtil { /** * 手动回滚事务 */ - public static void rollback() throws Exception { + public static void rollback(String xid) throws Exception { try { - ConnectionFactory.notify(false); + ConnectionFactory.notify(xid, false); } finally { log.debug("dynamic-datasource rollback local tx [{}]", TransactionContext.getXID()); TransactionContext.remove(); diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/SuspendedResourcesHolder.java b/src/main/java/com/baomidou/dynamic/datasource/tx/SuspendedResourcesHolder.java new file mode 100644 index 0000000..f9a58c7 --- /dev/null +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/SuspendedResourcesHolder.java @@ -0,0 +1,22 @@ +package com.baomidou.dynamic.datasource.tx; + +import javax.annotation.Nonnull; + +public class SuspendedResourcesHolder { + /** + * The xid + */ + private String xid; + + public SuspendedResourcesHolder(String xid) { + if (xid == null) { + throw new IllegalArgumentException("xid must be not null"); + } + this.xid = xid; + } + + @Nonnull + public String getXid() { + return xid; + } +} diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalExecutor.java b/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalExecutor.java new file mode 100644 index 0000000..f741a09 --- /dev/null +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalExecutor.java @@ -0,0 +1,10 @@ +package com.baomidou.dynamic.datasource.tx; + + + +public interface TransactionalExecutor { + + Object execute() throws Throwable; + + TransactionalInfo getTransactionInfo(); +} diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalInfo.java b/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalInfo.java new file mode 100644 index 0000000..448dac5 --- /dev/null +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalInfo.java @@ -0,0 +1,34 @@ +package com.baomidou.dynamic.datasource.tx; + +public class TransactionalInfo { + + Class[] rollbackFor; + + Class[] noRollbackFor; + + DsPropagation propagation; + + public Class[] getRollbackFor() { + return rollbackFor; + } + + public void setRollbackFor(Class[] rollbackFor) { + this.rollbackFor = rollbackFor; + } + + public Class[] getNoRollbackFor() { + return noRollbackFor; + } + + public void setNoRollbackFor(Class[] noRollbackFor) { + this.noRollbackFor = noRollbackFor; + } + + public DsPropagation getPropagation() { + return propagation; + } + + public void setPropagation(DsPropagation propagation) { + this.propagation = propagation; + } +} diff --git a/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalTemplate.java b/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalTemplate.java new file mode 100644 index 0000000..e37a0ba --- /dev/null +++ b/src/main/java/com/baomidou/dynamic/datasource/tx/TransactionalTemplate.java @@ -0,0 +1,149 @@ +package com.baomidou.dynamic.datasource.tx; + +import com.baomidou.dynamic.datasource.exception.TransactionException; +import com.baomidou.mybatisplus.core.toolkit.ArrayUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; + +import java.util.Objects; + +@Slf4j +public class TransactionalTemplate { + + public Object execute(TransactionalExecutor transactionalExecutor) throws Throwable { + TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo(); + DsPropagation propagation = transactionInfo.propagation; + SuspendedResourcesHolder suspendedResourcesHolder = null; + try { + switch (propagation) { + case NOT_SUPPORTED: + // If transaction is existing, suspend it. + if (existingTransaction()) { + suspendedResourcesHolder = suspend(); + } + return transactionalExecutor.execute(); + case REQUIRES_NEW: + // If transaction is existing, suspend it, and then begin new transaction. + if (existingTransaction()) { + suspendedResourcesHolder = suspend(); + } + // Continue and execute with new transaction + break; + case SUPPORTS: + // If transaction is not existing, execute without transaction. + if (!existingTransaction()) { + return transactionalExecutor.execute(); + } + // Continue and execute with new transaction + break; + case REQUIRED: + // default + break; + case NEVER: + // If transaction is existing, throw exception. + if (existingTransaction()) { + throw new TransactionException("Existing transaction found for transaction marked with propagation never"); + } else { + // Execute without transaction and return. + return transactionalExecutor.execute(); + } + case MANDATORY: + // If transaction is not existing, throw exception. + if (!existingTransaction()) { + throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); + } + // Continue and execute with current transaction. + break; + default: + throw new TransactionException("Not Supported Propagation:" + propagation); + } + return doExecute(transactionalExecutor); + } finally { + resume(suspendedResourcesHolder); + } + } + + private Object doExecute(TransactionalExecutor transactionalExecutor) throws Throwable { + TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo(); + if (!StringUtils.isEmpty(TransactionContext.getXID())) { + return transactionalExecutor.execute(); + } + boolean state = true; + Object o; + String xid = LocalTxUtil.startTransaction(); + try { + o = transactionalExecutor.execute(); + } catch (Exception e) { + state = !isRollback(e, transactionInfo); + throw e; + } finally { + if (state) { + LocalTxUtil.commit(xid); + } else { + LocalTxUtil.rollback(xid); + } + } + return o; + } + + private boolean isRollback(Throwable e, TransactionalInfo transactionInfo) { + boolean isRollback = true; + Class[] rollbacks = transactionInfo.rollbackFor; + Class[] noRollbackFor = transactionInfo.noRollbackFor; + if (ArrayUtils.isNotEmpty(noRollbackFor)) { + for (Class noRollBack : noRollbackFor) { + int depth = getDepth(e.getClass(), noRollBack); + if (depth >= 0) { + return false; + } + } + } + if (ArrayUtils.isNotEmpty(rollbacks)) { + for (Class rollback : rollbacks) { + int depth = getDepth(e.getClass(), rollback); + if (depth >= 0) { + return isRollback; + } + } + } + return false; + } + + private int getDepth(Class exceptionClass, Class rollback) { + if (rollback == Throwable.class || rollback == Exception.class) { + return 0; + } + // If we've gone as far as we can go and haven't found it... + if (exceptionClass == Throwable.class) { + return -1; + } + if (Objects.equals(exceptionClass, rollback)) { + return 0; + } + return getDepth(exceptionClass.getSuperclass(), rollback); + } + + private void resume(SuspendedResourcesHolder suspendedResourcesHolder) { + if (suspendedResourcesHolder != null) { + String xid = suspendedResourcesHolder.getXid(); + TransactionContext.bind(xid); + } + } + + public SuspendedResourcesHolder suspend() { + String xid = TransactionContext.getXID(); + if (xid != null) { + if (log.isInfoEnabled()) { + log.info("Suspending current transaction, xid = {}", xid); + } + TransactionContext.unbind(xid); + return new SuspendedResourcesHolder(xid); + } else { + return null; + } + } + + public boolean existingTransaction() { + return !StringUtils.isEmpty(TransactionContext.getXID()); + } +}