以下是一个基于 Spring Boot + RocketMQ 的完整分布式事务实战 Demo,包含事务消息、本地事务、自动重试、死信队列(DLQ) 等核心机制。代码已充分注释,可直接运行。
一、项目结构
src/main/java
├── com.example.rocketmq
│ ├── controller
│ │ └── OrderController.java
│ ├── model
│ │ ├── Order.java
│ │ ├── OrderRequest.java
│ ├── repository
│ │ ├── OrderRepository.java
│ ├── service
│ │ ├── InventoryService.java
│ │ ├── OrderService.java
│ │ ├── PaymentService.java
│ ├── listener
│ │ └── OrderConsumer.java
│ └── RocketMQConfig.java
├── application.yml
└── pom.xml
二、依赖配置(pom.xml)
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Spring Boot Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.11.0</version></dependency><!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Spring Data JPA --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency>
</dependencies>
三、配置文件(application.yml)
# RocketMQ配置
rocketmq:producer:name-server: localhost:9876default-topic: order_topicconsumer:name-server: localhost:9876default-topic: order_topicconsumer-group: order_consumer_groupacknowledge-mode: AUTOmax-reconsume-times: 5 # 最大重试次数broker:role: SYNC_MASTER # 同步复制模式store-path-commit-log: /data/rocketmq/commitlogstore-path-consume-queue: /data/rocketmq/consumequeue# 数据库配置
spring:datasource:url: jdbc:mysql://localhost:3306/rocketmq_db?useSSL=false&serverTimezone=UTCusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverhikari:maximum-pool-size: 20jpa:hibernate:ddl-auto: updateshow-sql: true
四、核心代码实现
1. 实体类(Order.java)
@Entity
@Table(name = "orders")
@Data
public class Order {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String userId;private BigDecimal amount;private String sku;private Integer status; // 0-待支付,1-已支付,2-已发货
}
2. 生产者代码(OrderService.java)
@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate PaymentService paymentService;@Transactional // 本地事务public void createOrder(OrderRequest request) {// 1. 扣减库存(本地事务)inventoryService.deduct(request.getSku());// 2. 发送事务消息(与本地事务绑定)rocketMQTemplate.sendMessageInTransaction("order_topic", request, () -> { // 事务回滚回调System.out.println("本地事务回滚,消息未发送!");return null;});}
}
3. 消费者代码(OrderConsumer.java)
@Service
public class OrderConsumer {@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;@RocketMQListener(topics = "order_topic",consumerGroup = "order_consumer_group",acknowledge-mode = AcknowledgeMode.AUTO)public void listen(OrderRequest request) {try {// 1. 生成订单记录Order order = new Order();order.setUserId(request.getUserId());order.setAmount(request.getAmount());order.setSku(request.getSku());orderRepository.save(order);// 2. 扣款(外部服务调用)paymentService.charge(request.getUserId(), request.getAmount());// 3. 发送物流通知(模拟成功)System.out.println("物流已通知,订单号: " + order.getId());} catch (Exception e) {// 4. 异常处理:触发重试或补偿System.out.println("处理失败,触发重试! 订单号: " + request.getOrderNo());throw new RuntimeException("订单处理失败", e);}}
}
4. 支付服务(PaymentService.java)
@Service
public class PaymentService {@Autowiredprivate PaymentRepository paymentRepository;public void charge(String userId, BigDecimal amount) {// 模拟支付失败(30%概率)if (Math.random() < 0.3) {throw new RuntimeException("支付失败,用户: " + userId);}Payment payment = new Payment();payment.setUserId(userId);payment.setAmount(amount);payment.setStatus("SUCCESS");paymentRepository.save(payment);}
}
五、数据库表设计
1. 订单表(orders)
CREATE TABLE orders (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id VARCHAR(50) NOT NULL,amount DECIMAL(10,2) NOT NULL,sku VARCHAR(50) NOT NULL,status TINYINT DEFAULT 0 COMMENT '0-待支付,1-已支付,2-已发货'
);
2. 支付表(payments)
CREATE TABLE payments (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id VARCHAR(50) NOT NULL,amount DECIMAL(10,2) NOT NULL,status ENUM('SUCCESS', 'FAILED') DEFAULT 'SUCCESS'
);
六、测试与验证
1. 正常流程
• 步骤:
- 发送创建订单请求(扣减库存 + 发送事务消息)。
- 消费者处理消息(生成订单 + 扣款 + 物流通知)。
• 预期结果:
• 库存减少,订单和支付记录生成,物流通知成功。
2. 异常流程(支付失败)
• 步骤:
- 发送创建订单请求。
- 消费者处理时支付失败(抛出异常)。
- RocketMQ自动重试(默认3次)。
- 重试失败后消息转入DLQ。
• 预期结果:
• 库存已恢复(通过本地事务回滚)。
• 订单未生成,支付记录未插入。
3. DLQ处理
• 操作:手动消费DLQ中的消息,排查支付失败原因(如用户余额不足)。
• 代码示例:
@RocketMQListener(topics = "order_topic_DLQ",consumerGroup = "order_consumer_group_dlq"
)
public void listenDLQ(OrderRequest request) {System.out.println("处理死信消息: " + request.getOrderNo());// 人工干预逻辑(如短信通知用户)
}
七、关键机制说明
1. 事务消息与本地事务绑定
• 代码示例:sendMessageInTransaction
方法将消息发送与本地事务提交原子化。
• 流程:
• 本地事务成功 → RocketMQ持久化消息。
• 本地事务失败 → RocketMQ丢弃消息。
2. 自动重试与死信队列
• 配置:max-reconsume-times=5
表示最大重试5次。
• DLQ Topic:默认死信队列名称为 order_topic_Retry
,可通过 spring.rabbitmq.listener.defaultDLQ
配置。
3. ACK确认机制
• 自动ACK:消费者处理完消息后自动发送确认,RocketMQ删除消息。
• 手动ACK(可选):通过 AcknowledgeMode.MANUAL
控制。
八、生产环境优化建议
- 持久化配置:
• 确保storePathCommitLog
和storePathConsumeQueue
指向持久化磁盘路径。 - 多Broker集群:
• 部署多个Broker节点,配置brokerRole=SYNC_MASTER
实现高可用。 - 监控与报警:
• 监控ConsumerLag
和MessagesPending
指标,阈值报警。 - 日志记录:
• 启用RocketMQ日志(log4j2.xml
),记录消息生产、消费详情。
九、总结
通过本Demo,你已掌握以下核心技能:
- 事务消息:结合本地事务实现强一致性。
- 自动重试:处理临时性故障(如网络抖动)。
- 死信队列:隔离无法处理的异常消息。
- 监控与运维:通过指标和日志保障系统稳定性。
下一步行动:
• 将Demo部署到Docker容器,模拟高并发场景。
• 结合Seata框架实现更复杂的分布式事务(如订单-库存-支付三阶段)。