您的位置:首页 > 新闻 > 热点要闻 > 网站免费正能量链接_计算机网页设计专业学什么_淘宝关键词top排行榜_怎么做推广

网站免费正能量链接_计算机网页设计专业学什么_淘宝关键词top排行榜_怎么做推广

2025/5/9 15:52:29 来源:https://blog.csdn.net/tang_sy/article/details/146431143  浏览:    关键词:网站免费正能量链接_计算机网页设计专业学什么_淘宝关键词top排行榜_怎么做推广
网站免费正能量链接_计算机网页设计专业学什么_淘宝关键词top排行榜_怎么做推广

以下是一个基于 Spring Boot + RocketMQ 的完整分布式事务实战 Demo,包含事务消息本地事务自动重试死信队列(DLQ) 等核心机制。代码已充分注释,可直接运行。


一、项目结构

src/main/java
├── com.example.rocketmq
│   ├── controller
│   │   └── OrderController.java
│   ├── model
│   │   ├── Order.java
│   │   ├── OrderRequest.java
│   ├── repository
│   │   ├── OrderRepository.java
│   ├── service
│   │   ├── InventoryService.java
│   │   ├── OrderService.java
│   │   ├── PaymentService.java
│   ├── listener
│   │   └── OrderConsumer.java
│   └── RocketMQConfig.java
├── application.yml
└── pom.xml

在这里插入图片描述


二、依赖配置(pom.xml)

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Spring Boot Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.11.0</version></dependency><!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Spring Data JPA --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency>
</dependencies>

三、配置文件(application.yml)

# RocketMQ配置
rocketmq:producer:name-server: localhost:9876default-topic: order_topicconsumer:name-server: localhost:9876default-topic: order_topicconsumer-group: order_consumer_groupacknowledge-mode: AUTOmax-reconsume-times: 5 # 最大重试次数broker:role: SYNC_MASTER # 同步复制模式store-path-commit-log: /data/rocketmq/commitlogstore-path-consume-queue: /data/rocketmq/consumequeue# 数据库配置
spring:datasource:url: jdbc:mysql://localhost:3306/rocketmq_db?useSSL=false&serverTimezone=UTCusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverhikari:maximum-pool-size: 20jpa:hibernate:ddl-auto: updateshow-sql: true

四、核心代码实现

1. 实体类(Order.java)
@Entity
@Table(name = "orders")
@Data
public class Order {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String userId;private BigDecimal amount;private String sku;private Integer status; // 0-待支付,1-已支付,2-已发货
}
2. 生产者代码(OrderService.java)
@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate PaymentService paymentService;@Transactional // 本地事务public void createOrder(OrderRequest request) {// 1. 扣减库存(本地事务)inventoryService.deduct(request.getSku());// 2. 发送事务消息(与本地事务绑定)rocketMQTemplate.sendMessageInTransaction("order_topic", request, () -> { // 事务回滚回调System.out.println("本地事务回滚,消息未发送!");return null;});}
}
3. 消费者代码(OrderConsumer.java)
@Service
public class OrderConsumer {@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;@RocketMQListener(topics = "order_topic",consumerGroup = "order_consumer_group",acknowledge-mode = AcknowledgeMode.AUTO)public void listen(OrderRequest request) {try {// 1. 生成订单记录Order order = new Order();order.setUserId(request.getUserId());order.setAmount(request.getAmount());order.setSku(request.getSku());orderRepository.save(order);// 2. 扣款(外部服务调用)paymentService.charge(request.getUserId(), request.getAmount());// 3. 发送物流通知(模拟成功)System.out.println("物流已通知,订单号: " + order.getId());} catch (Exception e) {// 4. 异常处理:触发重试或补偿System.out.println("处理失败,触发重试! 订单号: " + request.getOrderNo());throw new RuntimeException("订单处理失败", e);}}
}
4. 支付服务(PaymentService.java)
@Service
public class PaymentService {@Autowiredprivate PaymentRepository paymentRepository;public void charge(String userId, BigDecimal amount) {// 模拟支付失败(30%概率)if (Math.random() < 0.3) {throw new RuntimeException("支付失败,用户: " + userId);}Payment payment = new Payment();payment.setUserId(userId);payment.setAmount(amount);payment.setStatus("SUCCESS");paymentRepository.save(payment);}
}

五、数据库表设计

1. 订单表(orders)
CREATE TABLE orders (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id VARCHAR(50) NOT NULL,amount DECIMAL(10,2) NOT NULL,sku VARCHAR(50) NOT NULL,status TINYINT DEFAULT 0 COMMENT '0-待支付,1-已支付,2-已发货'
);
2. 支付表(payments)
CREATE TABLE payments (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id VARCHAR(50) NOT NULL,amount DECIMAL(10,2) NOT NULL,status ENUM('SUCCESS', 'FAILED') DEFAULT 'SUCCESS'
);

六、测试与验证

1. 正常流程

步骤

  1. 发送创建订单请求(扣减库存 + 发送事务消息)。
  2. 消费者处理消息(生成订单 + 扣款 + 物流通知)。
    预期结果
    • 库存减少,订单和支付记录生成,物流通知成功。
2. 异常流程(支付失败)

步骤

  1. 发送创建订单请求。
  2. 消费者处理时支付失败(抛出异常)。
  3. RocketMQ自动重试(默认3次)。
  4. 重试失败后消息转入DLQ。
    预期结果
    • 库存已恢复(通过本地事务回滚)。
    • 订单未生成,支付记录未插入。
3. DLQ处理

操作:手动消费DLQ中的消息,排查支付失败原因(如用户余额不足)。
代码示例

@RocketMQListener(topics = "order_topic_DLQ",consumerGroup = "order_consumer_group_dlq"
)
public void listenDLQ(OrderRequest request) {System.out.println("处理死信消息: " + request.getOrderNo());// 人工干预逻辑(如短信通知用户)
}

七、关键机制说明

1. 事务消息与本地事务绑定

代码示例sendMessageInTransaction 方法将消息发送与本地事务提交原子化。
流程
• 本地事务成功 → RocketMQ持久化消息。
• 本地事务失败 → RocketMQ丢弃消息。

2. 自动重试与死信队列

配置max-reconsume-times=5 表示最大重试5次。
DLQ Topic:默认死信队列名称为 order_topic_Retry,可通过 spring.rabbitmq.listener.defaultDLQ 配置。

3. ACK确认机制

自动ACK:消费者处理完消息后自动发送确认,RocketMQ删除消息。
手动ACK(可选):通过 AcknowledgeMode.MANUAL 控制。


八、生产环境优化建议

  1. 持久化配置
    • 确保 storePathCommitLogstorePathConsumeQueue 指向持久化磁盘路径。
  2. 多Broker集群
    • 部署多个Broker节点,配置 brokerRole=SYNC_MASTER 实现高可用。
  3. 监控与报警
    • 监控 ConsumerLagMessagesPending 指标,阈值报警。
  4. 日志记录
    • 启用RocketMQ日志(log4j2.xml),记录消息生产、消费详情。

九、总结

通过本Demo,你已掌握以下核心技能:

  1. 事务消息:结合本地事务实现强一致性。
  2. 自动重试:处理临时性故障(如网络抖动)。
  3. 死信队列:隔离无法处理的异常消息。
  4. 监控与运维:通过指标和日志保障系统稳定性。

下一步行动
• 将Demo部署到Docker容器,模拟高并发场景。
• 结合Seata框架实现更复杂的分布式事务(如订单-库存-支付三阶段)。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com