新增多数据源事务传播机制 (#406)

本地多数据源事务传播机制
This commit is contained in:
hzh 2022-12-26 13:54:30 +08:00 committed by GitHub
parent 638b2dc1e6
commit e2e176d5a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 328 additions and 54 deletions

View File

@ -15,6 +15,10 @@
*/ */
package com.baomidou.dynamic.datasource.annotation; package com.baomidou.dynamic.datasource.annotation;
import com.baomidou.dynamic.datasource.tx.DsPropagation;
import java.lang.annotation.*; import java.lang.annotation.*;
/** /**
@ -26,4 +30,9 @@ import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Documented @Documented
public @interface DSTransactional { public @interface DSTransactional {
Class<? extends Throwable>[] rollbackFor() default {Exception.class};
Class<? extends Throwable>[] noRollbackFor() default {};
DsPropagation propagation() default DsPropagation.REQUIRED;
} }

View File

@ -15,39 +15,41 @@
*/ */
package com.baomidou.dynamic.datasource.aop; package com.baomidou.dynamic.datasource.aop;
import com.baomidou.dynamic.datasource.tx.LocalTxUtil; import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.dynamic.datasource.tx.TransactionContext; import com.baomidou.dynamic.datasource.tx.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation; import org.aopalliance.intercept.MethodInvocation;
import org.springframework.util.StringUtils; import java.lang.reflect.Method;
/** /**
* @author funkye * @author funkye
*/ */
@Slf4j @Slf4j
public class DynamicLocalTransactionInterceptor implements MethodInterceptor { public class DynamicLocalTransactionInterceptor implements MethodInterceptor {
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
@Override @Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable { public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
if (!StringUtils.isEmpty(TransactionContext.getXID())) { Method method = methodInvocation.getMethod();
return methodInvocation.proceed(); final DSTransactional dsTransactional = method.getAnnotation(DSTransactional.class);
}
boolean state = true; TransactionalExecutor transactionalExecutor = new TransactionalExecutor() {
Object o; @Override
LocalTxUtil.startTransaction(); public Object execute() throws Throwable {
try { return methodInvocation.proceed();
o = methodInvocation.proceed();
} catch (Exception e) {
state = false;
throw e;
} finally {
if (state) {
LocalTxUtil.commit();
} else {
LocalTxUtil.rollback();
} }
}
return o; @Override
public TransactionalInfo getTransactionInfo() {
TransactionalInfo transactionInfo = new TransactionalInfo();
transactionInfo.setPropagation(dsTransactional.propagation());
transactionInfo.setNoRollbackFor(dsTransactional.noRollbackFor());
transactionInfo.setRollbackFor(dsTransactional.rollbackFor());
return transactionInfo;
}
};
return transactionalTemplate.execute(transactionalExecutor);
} }
} }

View File

@ -56,8 +56,8 @@ public abstract class AbstractRoutingDataSource extends AbstractDataSource {
} else { } else {
String ds = DynamicDataSourceContextHolder.peek(); String ds = DynamicDataSourceContextHolder.peek();
ds = StringUtils.isEmpty(ds) ? getPrimary() : ds; ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;
ConnectionProxy connection = ConnectionFactory.getConnection(ds); ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds);
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection; return connection == null ? getConnectionProxy(xid, ds, determineDataSource().getConnection()) : connection;
} }
} }
@ -69,15 +69,15 @@ public abstract class AbstractRoutingDataSource extends AbstractDataSource {
} else { } else {
String ds = DynamicDataSourceContextHolder.peek(); String ds = DynamicDataSourceContextHolder.peek();
ds = StringUtils.isEmpty(ds) ? getPrimary() : ds; ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;
ConnectionProxy connection = ConnectionFactory.getConnection(ds); ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds);
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password)) return connection == null ? getConnectionProxy(xid, ds, determineDataSource().getConnection(username, password))
: connection; : connection;
} }
} }
private Connection getConnectionProxy(String ds, Connection connection) { private Connection getConnectionProxy(String xid, String ds, Connection connection) {
ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds); ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds);
ConnectionFactory.putConnection(ds, connectionProxy); ConnectionFactory.putConnection(xid, ds, connectionProxy);
return connectionProxy; return connectionProxy;
} }

View File

@ -0,0 +1,11 @@
package com.baomidou.dynamic.datasource.exception;
public class TransactionException extends RuntimeException {
public TransactionException(String message) {
super(message);
}
public TransactionException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -14,7 +14,6 @@
* limitations under the License. * limitations under the License.
*/ */
package com.baomidou.dynamic.datasource.spring.boot.autoconfigure; package com.baomidou.dynamic.datasource.spring.boot.autoconfigure;
import com.baomidou.dynamic.datasource.DynamicRoutingDataSource; import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.dynamic.datasource.annotation.DSTransactional; import com.baomidou.dynamic.datasource.annotation.DSTransactional;

View File

@ -110,4 +110,4 @@ public class DynamicDataSourceProperties {
*/ */
@NestedConfigurationProperty @NestedConfigurationProperty
private DynamicDatasourceAopProperties aop = new DynamicDatasourceAopProperties(); private DynamicDatasourceAopProperties aop = new DynamicDatasourceAopProperties();
} }

View File

@ -15,6 +15,8 @@
*/ */
package com.baomidou.dynamic.datasource.tx; package com.baomidou.dynamic.datasource.tx;
import org.springframework.util.CollectionUtils;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -24,43 +26,60 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class ConnectionFactory { public class ConnectionFactory {
private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER = private static final ThreadLocal<Map<String, Map<String, ConnectionProxy>>> CONNECTION_HOLDER =
new ThreadLocal<Map<String, ConnectionProxy>>() { new ThreadLocal<Map<String, Map<String, ConnectionProxy>>>() {
@Override @Override
protected Map<String, ConnectionProxy> initialValue() { protected Map<String, Map<String, ConnectionProxy>> initialValue() {
return new ConcurrentHashMap<>(8); return new ConcurrentHashMap<>();
} }
}; };
public static void putConnection(String ds, ConnectionProxy connection) { public static void putConnection(String xid, String ds, ConnectionProxy connection) {
Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get(); Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
if (!concurrentHashMap.containsKey(ds)) { Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
if (connectionProxyMap == null) {
connectionProxyMap = new ConcurrentHashMap<>();
concurrentHashMap.put(xid, connectionProxyMap);
}
if (!connectionProxyMap.containsKey(ds)) {
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} }
concurrentHashMap.put(ds, connection); connectionProxyMap.put(ds, connection);
} }
} }
public static ConnectionProxy getConnection(String ds) { public static ConnectionProxy getConnection(String xid, String ds) {
return CONNECTION_HOLDER.get().get(ds); Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
if (CollectionUtils.isEmpty(connectionProxyMap)) {
return null;
}
return connectionProxyMap.get(ds);
} }
public static void notify(Boolean state) throws Exception { public static void notify(String xid, Boolean state) throws Exception {
Exception exception = null; Exception exception = null;
Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
try { try {
Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get(); if (CollectionUtils.isEmpty(concurrentHashMap)) {
for (ConnectionProxy connectionProxy : concurrentHashMap.values()) { return;
}
Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
for (ConnectionProxy connectionProxy : connectionProxyMap.values()) {
try { try {
connectionProxy.notify(state); if (connectionProxy != null) {
connectionProxy.notify(state);
}
} catch (SQLException e) { } catch (SQLException e) {
exception = e; exception = e;
} }
} }
} finally { } finally {
CONNECTION_HOLDER.remove(); concurrentHashMap.remove(xid);
if (exception != null) { if (exception != null) {
throw exception; throw exception;
} }

View File

@ -16,7 +16,6 @@
package com.baomidou.dynamic.datasource.tx; package com.baomidou.dynamic.datasource.tx;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.sql.*; import java.sql.*;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;

View File

@ -0,0 +1,18 @@
package com.baomidou.dynamic.datasource.tx;
public enum DsPropagation {
//支持当前事务如果当前没有事务就新建一个事务这是最常见的选择
REQUIRED,
//新建事务如果当前存在事务把当前事务挂起
REQUIRES_NEW,
//以非事务方式执行操作如果当前存在事务就把当前事务挂起
NOT_SUPPORTED,
//支持当前事务如果当前没有事务就以非事务方式执行
SUPPORTS,
//以非事务方式执行如果当前存在事务则抛出异常
NEVER,
//支持当前事务如果当前没有事务就抛出异常
MANDATORY
}

View File

@ -32,22 +32,24 @@ public final class LocalTxUtil {
/** /**
* 手动开启事务 * 手动开启事务
*/ */
public static void startTransaction() { public static String startTransaction() {
if (!StringUtils.isEmpty(TransactionContext.getXID())) { String xid = TransactionContext.getXID();
log.debug("dynamic-datasource exist local tx [{}]", TransactionContext.getXID()); if (!StringUtils.isEmpty(xid)) {
log.debug("dynamic-datasource exist local tx [{}]", xid);
} else { } else {
String xid = UUID.randomUUID().toString(); xid = UUID.randomUUID().toString();
TransactionContext.bind(xid); TransactionContext.bind(xid);
log.debug("dynamic-datasource start local tx [{}]", xid); log.debug("dynamic-datasource start local tx [{}]", xid);
} }
return xid;
} }
/** /**
* 手动提交事务 * 手动提交事务
*/ */
public static void commit() throws Exception { public static void commit(String xid) throws Exception {
try { try {
ConnectionFactory.notify(true); ConnectionFactory.notify(xid, true);
} finally { } finally {
log.debug("dynamic-datasource commit local tx [{}]", TransactionContext.getXID()); log.debug("dynamic-datasource commit local tx [{}]", TransactionContext.getXID());
TransactionContext.remove(); TransactionContext.remove();
@ -57,9 +59,9 @@ public final class LocalTxUtil {
/** /**
* 手动回滚事务 * 手动回滚事务
*/ */
public static void rollback() throws Exception { public static void rollback(String xid) throws Exception {
try { try {
ConnectionFactory.notify(false); ConnectionFactory.notify(xid, false);
} finally { } finally {
log.debug("dynamic-datasource rollback local tx [{}]", TransactionContext.getXID()); log.debug("dynamic-datasource rollback local tx [{}]", TransactionContext.getXID());
TransactionContext.remove(); TransactionContext.remove();

View File

@ -0,0 +1,22 @@
package com.baomidou.dynamic.datasource.tx;
import javax.annotation.Nonnull;
public class SuspendedResourcesHolder {
/**
* The xid
*/
private String xid;
public SuspendedResourcesHolder(String xid) {
if (xid == null) {
throw new IllegalArgumentException("xid must be not null");
}
this.xid = xid;
}
@Nonnull
public String getXid() {
return xid;
}
}

View File

@ -0,0 +1,10 @@
package com.baomidou.dynamic.datasource.tx;
public interface TransactionalExecutor {
Object execute() throws Throwable;
TransactionalInfo getTransactionInfo();
}

View File

@ -0,0 +1,34 @@
package com.baomidou.dynamic.datasource.tx;
public class TransactionalInfo {
Class<? extends Throwable>[] rollbackFor;
Class<? extends Throwable>[] noRollbackFor;
DsPropagation propagation;
public Class<? extends Throwable>[] getRollbackFor() {
return rollbackFor;
}
public void setRollbackFor(Class<? extends Throwable>[] rollbackFor) {
this.rollbackFor = rollbackFor;
}
public Class<? extends Throwable>[] getNoRollbackFor() {
return noRollbackFor;
}
public void setNoRollbackFor(Class<? extends Throwable>[] noRollbackFor) {
this.noRollbackFor = noRollbackFor;
}
public DsPropagation getPropagation() {
return propagation;
}
public void setPropagation(DsPropagation propagation) {
this.propagation = propagation;
}
}

View File

@ -0,0 +1,149 @@
package com.baomidou.dynamic.datasource.tx;
import com.baomidou.dynamic.datasource.exception.TransactionException;
import com.baomidou.mybatisplus.core.toolkit.ArrayUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.util.Objects;
@Slf4j
public class TransactionalTemplate {
public Object execute(TransactionalExecutor transactionalExecutor) throws Throwable {
TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo();
DsPropagation propagation = transactionInfo.propagation;
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction()) {
suspendedResourcesHolder = suspend();
}
return transactionalExecutor.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction()) {
suspendedResourcesHolder = suspend();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (!existingTransaction()) {
return transactionalExecutor.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// default
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction()) {
throw new TransactionException("Existing transaction found for transaction marked with propagation never");
} else {
// Execute without transaction and return.
return transactionalExecutor.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (!existingTransaction()) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
return doExecute(transactionalExecutor);
} finally {
resume(suspendedResourcesHolder);
}
}
private Object doExecute(TransactionalExecutor transactionalExecutor) throws Throwable {
TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo();
if (!StringUtils.isEmpty(TransactionContext.getXID())) {
return transactionalExecutor.execute();
}
boolean state = true;
Object o;
String xid = LocalTxUtil.startTransaction();
try {
o = transactionalExecutor.execute();
} catch (Exception e) {
state = !isRollback(e, transactionInfo);
throw e;
} finally {
if (state) {
LocalTxUtil.commit(xid);
} else {
LocalTxUtil.rollback(xid);
}
}
return o;
}
private boolean isRollback(Throwable e, TransactionalInfo transactionInfo) {
boolean isRollback = true;
Class<? extends Throwable>[] rollbacks = transactionInfo.rollbackFor;
Class<? extends Throwable>[] noRollbackFor = transactionInfo.noRollbackFor;
if (ArrayUtils.isNotEmpty(noRollbackFor)) {
for (Class<? extends Throwable> noRollBack : noRollbackFor) {
int depth = getDepth(e.getClass(), noRollBack);
if (depth >= 0) {
return false;
}
}
}
if (ArrayUtils.isNotEmpty(rollbacks)) {
for (Class<? extends Throwable> rollback : rollbacks) {
int depth = getDepth(e.getClass(), rollback);
if (depth >= 0) {
return isRollback;
}
}
}
return false;
}
private int getDepth(Class<?> exceptionClass, Class<? extends Throwable> rollback) {
if (rollback == Throwable.class || rollback == Exception.class) {
return 0;
}
// If we've gone as far as we can go and haven't found it...
if (exceptionClass == Throwable.class) {
return -1;
}
if (Objects.equals(exceptionClass, rollback)) {
return 0;
}
return getDepth(exceptionClass.getSuperclass(), rollback);
}
private void resume(SuspendedResourcesHolder suspendedResourcesHolder) {
if (suspendedResourcesHolder != null) {
String xid = suspendedResourcesHolder.getXid();
TransactionContext.bind(xid);
}
}
public SuspendedResourcesHolder suspend() {
String xid = TransactionContext.getXID();
if (xid != null) {
if (log.isInfoEnabled()) {
log.info("Suspending current transaction, xid = {}", xid);
}
TransactionContext.unbind(xid);
return new SuspendedResourcesHolder(xid);
} else {
return null;
}
}
public boolean existingTransaction() {
return !StringUtils.isEmpty(TransactionContext.getXID());
}
}