分布式事务改进一下

This commit is contained in:
hubin 2019-04-20 16:23:41 +08:00
parent d1510d1314
commit eeddcf313c
19 changed files with 80 additions and 132 deletions

View File

@ -37,9 +37,6 @@ ext {
"spring-tx" : "org.springframework:spring-tx:${springVersion}",
"spring-web" : "org.springframework:spring-web:${springVersion}",
"spring-aop" : "org.springframework:spring-aop:${springVersion}",
"spring-rabbit" : "org.springframework.amqp:spring-rabbit:2.0.3.RELEASE",
"spring-data-redis" : "org.springframework.data:spring-data-redis:2.0.2.RELEASE",
"jackson-databind" : "com.fasterxml.jackson.core:jackson-databind:2.8.9",
"aspectjrt" : "org.aspectj:aspectjrt:1.8.13",
"cglib" : "cglib:cglib:3.2.6",
"lombok" : "org.projectlombok:lombok:1.18.4",

View File

@ -6,11 +6,9 @@ dependencies {
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
annotationProcessor "org.springframework.boot:spring-boot-autoconfigure-processor:${springBootVersion}"
api 'org.springframework.boot:spring-boot-autoconfigure'
api 'org.springframework.boot:spring-boot-starter-jdbc'
implementation "${lib.aspectjrt}"
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework.boot:spring-boot-configuration-processor'
implementation 'org.springframework.boot:spring-boot-autoconfigure-processor'
testImplementation 'org.springframework.boot:spring-boot-starter-test'

View File

@ -1,6 +1,7 @@
package com.baomidou.mybatisplus.rmt.config;
package com.baomidou.mybatisplus.dts;
import com.baomidou.mybatisplus.rmt.mq.RabbitConfiguration;
import com.baomidou.mybatisplus.dts.rabbit.config.RmtAutoConfiguration;
import com.baomidou.mybatisplus.dts.rabbit.mq.RabbitConfiguration;
import org.springframework.context.annotation.Import;
import java.lang.annotation.Documented;
@ -19,7 +20,7 @@ import java.lang.annotation.Target;
@Retention(value = java.lang.annotation.RetentionPolicy.RUNTIME)
@Target(value = { java.lang.annotation.ElementType.TYPE })
@Documented
@Import({RabbitConfiguration.class})
@Import({RabbitConfiguration.class, RmtAutoConfiguration.class})
public @interface EnableRmtRabbit {
}

View File

@ -1,4 +1,4 @@
package com.baomidou.mybatisplus.rmt;
package com.baomidou.mybatisplus.dts;
/**
* <p>
@ -12,21 +12,21 @@ public interface RmtConstants {
/**
* 队列配置
*/
String EXCHANGE = "rmt.exchange";
String QUEUE = "rmt.queue";
String ROUTING_KEY = "rmt.routing.key";
String EXCHANGE = "rabbit-exchange";
String QUEUE = "rabbit-queue";
String ROUTING_KEY = "rabbit-routing-key";
/**
* 死信队列配置
*/
String DL_EXCHANGE = "rmt.dl.exchange";
String DL_QUEUE = "rmt.dl.queue";
String DL_ROUTING_KEY = "dlx.routing.key";
String DL_EXCHANGE = "rabbit-dl-exchange";
String DL_QUEUE = "rabbit-dl-queue";
String DL_ROUTING_KEY = "rabbit-dl-routing-key";
/**
* 默认 KEY
*/
String KEY = "rmt.key";
String KEY = "rabbit.key";
/**
* 消息重发计数
*/

View File

@ -1,4 +1,4 @@
package com.baomidou.mybatisplus.rmt;
package com.baomidou.mybatisplus.dts.rabbit;
import lombok.Data;

View File

@ -0,0 +1,42 @@
package com.baomidou.mybatisplus.dts.rabbit.config;
import com.baomidou.mybatisplus.dts.rabbit.coordinator.RedisRmtCoordinator;
import com.baomidou.mybatisplus.dts.rabbit.mq.RabbitTransactionalAspect;
import com.baomidou.mybatisplus.dts.rabbit.parser.JacksonRmtParser;
import com.baomidou.mybatisplus.dts.rabbit.sender.RabbitRmtSender;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Rabbit MQ 可靠消息配置
*
* @author jobob
* @since 2019-04-19
*/
@Configuration
public class RmtAutoConfiguration {
@Bean
public RedisRmtCoordinator redisRmtCoordinator() {
return new RedisRmtCoordinator();
}
@Bean
public JacksonRmtParser rmtParser() {
return new JacksonRmtParser();
}
@Bean
public RabbitRmtSender rmtSender() {
return new RabbitRmtSender();
}
/**
* 配置可靠消息事务发送者
*/
@Bean
public RabbitTransactionalAspect rabbitTransactionalAspect() {
return new RabbitTransactionalAspect();
}
}

View File

@ -1,6 +1,6 @@
package com.baomidou.mybatisplus.rmt.coordinator;
package com.baomidou.mybatisplus.dts.rabbit.coordinator;
import com.baomidou.mybatisplus.rmt.RmtMeta;
import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
import java.util.Collection;

View File

@ -1,7 +1,7 @@
package com.baomidou.mybatisplus.rmt.coordinator;
package com.baomidou.mybatisplus.dts.rabbit.coordinator;
import com.baomidou.mybatisplus.rmt.RmtConstants;
import com.baomidou.mybatisplus.rmt.RmtMeta;
import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;

View File

@ -1,4 +1,4 @@
package com.baomidou.mybatisplus.rmt.listener;
package com.baomidou.mybatisplus.dts.rabbit.listener;
/**
* 可靠消息事务监听

View File

@ -1,7 +1,7 @@
package com.baomidou.mybatisplus.rmt.listener;
package com.baomidou.mybatisplus.dts.rabbit.listener;
import com.baomidou.mybatisplus.rmt.RmtConstants;
import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;

View File

@ -1,9 +1,6 @@
package com.baomidou.mybatisplus.rmt.mq;
package com.baomidou.mybatisplus.dts.rabbit.mq;
import com.baomidou.mybatisplus.rmt.RmtConstants;
import com.baomidou.mybatisplus.rmt.coordinator.RedisRmtCoordinator;
import com.baomidou.mybatisplus.rmt.parser.JacksonRmtParser;
import com.baomidou.mybatisplus.rmt.sender.RabbitRmtSender;
import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
@ -12,12 +9,10 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.annotation.PostConstruct;
import java.util.HashMap;
@ -32,18 +27,11 @@ import java.util.Map;
@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class RabbitConfiguration {
@Autowired
protected PlatformTransactionManager transactionManager;
@Autowired
protected RabbitTemplate rabbitTemplate;
@Autowired
protected RabbitAdmin rabbitAdmin;
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
@ -51,14 +39,9 @@ public class RabbitConfiguration {
@PostConstruct
protected void init() {
/**
* make rabbit template to support transactions
*/
// make rabbit template to support transactions
rabbitTemplate.setChannelTransacted(true);
/**
* init queue
*/
// define deadletter exchange and queue
rabbitAdmin.declareExchange(new DirectExchange(RmtConstants.DL_EXCHANGE, true, false));
rabbitAdmin.declareQueue(new Queue(RmtConstants.DL_QUEUE, true, false, false, null));
@ -74,31 +57,4 @@ public class RabbitConfiguration {
// declare binding
rabbitAdmin.declareBinding(new Binding(RmtConstants.QUEUE, Binding.DestinationType.QUEUE, RmtConstants.EXCHANGE, RmtConstants.ROUTING_KEY, null));
}
/**
* <p>
* 配置可靠消息事务发送者
* </p>
*
* @return
*/
@Bean
public RabbitTransactionalAspect rabbitTransactionalAspect() {
return new RabbitTransactionalAspect();
}
@Bean
public RedisRmtCoordinator redisRmtCoordinator() {
return new RedisRmtCoordinator();
}
@Bean
public JacksonRmtParser rmtParser() {
return new JacksonRmtParser();
}
@Bean
public RabbitRmtSender rmtSender() {
return new RabbitRmtSender();
}
}

View File

@ -1,10 +1,10 @@
package com.baomidou.mybatisplus.rmt.mq;
package com.baomidou.mybatisplus.dts.rabbit.mq;
import com.baomidou.mybatisplus.rmt.RmtConstants;
import com.baomidou.mybatisplus.rmt.RmtMeta;
import com.baomidou.mybatisplus.rmt.annotation.RmTransactional;
import com.baomidou.mybatisplus.rmt.coordinator.IRmtCoordinator;
import com.baomidou.mybatisplus.rmt.sender.IRmtSender;
import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
import com.baomidou.mybatisplus.dts.annotation.RmTransactional;
import com.baomidou.mybatisplus.dts.rabbit.coordinator.IRmtCoordinator;
import com.baomidou.mybatisplus.dts.rabbit.sender.IRmtSender;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;

View File

@ -1,4 +1,4 @@
package com.baomidou.mybatisplus.rmt.parser;
package com.baomidou.mybatisplus.dts.rabbit.parser;
import org.springframework.amqp.support.converter.MessageConverter;

View File

@ -1,4 +1,4 @@
package com.baomidou.mybatisplus.rmt.parser;
package com.baomidou.mybatisplus.dts.rabbit.parser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -1,6 +1,6 @@
package com.baomidou.mybatisplus.rmt.sender;
package com.baomidou.mybatisplus.dts.rabbit.sender;
import com.baomidou.mybatisplus.rmt.RmtMeta;
import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
/**
* <p>

View File

@ -1,7 +1,7 @@
package com.baomidou.mybatisplus.rmt.sender;
package com.baomidou.mybatisplus.dts.rabbit.sender;
import com.baomidou.mybatisplus.rmt.RmtMeta;
import com.baomidou.mybatisplus.rmt.parser.IRmtParser;
import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
import com.baomidou.mybatisplus.dts.rabbit.parser.IRmtParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;

View File

@ -1,35 +0,0 @@
package com.baomidou.mybatisplus.rmt.annotation;
import com.baomidou.mybatisplus.rmt.RmtConstants;
import java.lang.annotation.*;
/**
* <p>
* 可靠消息事务 reliable message transactional
* </p>
*
* @author jobob
* @since 2019-04-17
*/
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.TYPE })
@Documented
public @interface RmTransactional {
/**
* 业务值
*/
String value() default "";
/**
* 交换器
*/
String exchange() default RmtConstants.EXCHANGE;
/**
* 路由 KEY
*/
String routingKey() default RmtConstants.KEY;
}

View File

@ -1,11 +0,0 @@
package com.baomidou.mybatisplus.rmt.daemon;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RmtResendProcess {
// 处理任务重发
}

View File

@ -1,8 +1,8 @@
rootProject.name = 'mybatis-plus-root'
include 'mybatis-plus'
include 'mybatis-plus-core'
include 'mybatis-plus-dts'
include 'mybatis-plus-annotation'
include 'mybatis-plus-extension'
include 'mybatis-plus-generator'
include 'mybatis-plus-rmt'
include 'mybatis-plus-boot-starter'