您的位置:首页 > 房产 > 家装 > 电商小程序开发定制_投放广告网站_网络营销策划方案ppt模板_pc网站优化排名

电商小程序开发定制_投放广告网站_网络营销策划方案ppt模板_pc网站优化排名

2025/7/25 10:36:08 来源:https://blog.csdn.net/weixin_44111939/article/details/146931842  浏览:    关键词:电商小程序开发定制_投放广告网站_网络营销策划方案ppt模板_pc网站优化排名
电商小程序开发定制_投放广告网站_网络营销策划方案ppt模板_pc网站优化排名

文章目录

  • 一、RocketMQ
    • 1.核心组件与架构
    • 2.消息模型与核心概念
    • 3.关键特性
    • 4.典型应用场景
    • 5.模型图
  • 二、RocketMQ使用
        • 注:(本文基于Windows安装并采用 JDK8 + Spring Boot 2.7.6 + ​rocketmq-spring-boot-starter:2.3.1 + ​RocketMQ 5.3.0环境)
    • 1.下载
    • 2.配置环境变量
      • 2.1.在系统变量里新增ROCKETMQ_HOME
      • 2.2.在系统变量Path中新增
    • 3.启动
      • 3.1.启动NameServer
      • 3.2.启动Broker
      • 3.3.脚本启动
  • 三、可视化工具RocketMQ-Dashboard1.0.0使用
    • 1.下载
    • 2.运行
      • 2.1.修改application.properties文件
      • 2.2.启动
      • 2.3.访问界面
      • 2.4.打包jar使用
  • 四、Spring Boot使用RocketMQ
    • 1.pom引入关键jar
    • 2.yml文件引入配置
    • 3.消息体创建
    • 4.Producer实现
    • 5.PushConsumer实现
    • 6.TransactionConsumer实现
    • 7.事务消息监听器实现类
    • 8.Controller测试发送
    • 9.测试验证
  • 总结

一、RocketMQ

RocketMQ官网文档

RocketMQ 是由阿里巴巴开源的分布式消息中间件,现为 Apache 顶级项目,专为高并发、高可靠场景设计,广泛应用于电商、金融、物流等领域。其核心架构由 ​NameServer、Broker、Producer 和 ​Consumer 四大组件构成,支持异步解耦、流量削峰、顺序事务等复杂业务需求。

1.核心组件与架构

​NameServer
轻量级元数据管理服务,负责 Broker 的注册与路由信息维护,无状态设计支持横向扩展。多个 NameServer 节点独立运行,避免单点故障。

​Broker
消息存储与转发的核心节点,采用主从架构(Master-Slave)实现高可用。数据通过顺序写盘(CommitLog)和索引(ConsumeQueue)结合的方式持久化,支持同步/异步复制,确保消息零丢失。

​Producer/Consumer
​Producer:支持同步、异步、单向发送模式,可根据哈希或轮询策略将消息分发到不同队列。
​Consumer:支持集群消费(负载均衡)和广播消费(全量订阅),消费失败时触发重试机制,最终进入死信队列(DLQ)。

2.消息模型与核心概念

​主题(Topic)​
消息的逻辑分类容器,例如电商场景中的订单、支付等业务各对应独立主题。每个主题由多个队列(MessageQueue)组成,支持水平扩展。

​队列(MessageQueue)​
消息存储的最小物理单元,队列内消息按写入顺序存储,天然支持顺序消费。生产者通过轮询或哈希算法将消息分发到不同队列,消费者组内实例并行处理不同队列以提升吞吐量。

​标签(Tag)​
主题下的次级分类,用于精细化消息过滤。例如订单主题中可通过 TagA 标识支付成功消息,消费者仅订阅特定标签以减少无效数据传输。

3.关键特性

​高性能与高可靠:单机吞吐量达十万级,支持万亿级消息堆积且性能不衰减。通过同步刷盘、主从冗余、事务消息等机制保障数据可靠。

​丰富的消息类型:支持普通消息、顺序消息(FIFO)、延时消息(精确到秒级)、事务消息(分布式事务一致性)等。

​云原生与弹性扩展:腾讯云、火山引擎等平台提供 Serverless 化部署,支持按消息量弹性扩缩容,存储按实际使用计费,降低运维成本。

4.典型应用场景

​异步解耦:订单系统与库存、支付系统通过消息队列解耦,提升系统响应速度。

​流量削峰:秒杀场景中,请求先写入队列,下游服务按处理能力消费,避免系统过载。

​日志收集:实时采集应用日志,推送至大数据平台(如 Flink、ELK)进行分析。

RocketMQ 凭借其金融级稳定性(历经阿里双十一万亿级流量验证),已成为企业构建分布式系统的核心中间件

5.模型图

官网模型图如下:
在这里插入图片描述


二、RocketMQ使用

注:(本文基于Windows安装并采用 JDK8 + Spring Boot 2.7.6 + ​rocketmq-spring-boot-starter:2.3.1 + ​RocketMQ 5.3.0环境)

1.下载

官网下载

按需下载自己需要的版本即可。将下载的zip文件进行解压,然后配置环境变量。
在这里插入图片描述
在这里插入图片描述

2.配置环境变量

右键“此电脑”->“属性”-》“高级系统设置”-》“环境变量”

2.1.在系统变量里新增ROCKETMQ_HOME

ROCKETMQ_HOME
值为解压路径:D:\MY_CODE\RocketMQ\rocketmq-all-5.3.0-bin-release

在这里插入图片描述

2.2.在系统变量Path中新增

%ROCKETMQ_HOME%\bin

在这里插入图片描述

3.启动

RocketMQ 需要启动两个服务:NameServer 和 Broker。先启动NameServer后启动Broker。打开CMD直接命令启动。

3.1.启动NameServer

start mqnamesrv.cmd

启动成功如下图所示:(boot success即可)

在这里插入图片描述

3.2.启动Broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

启动成功如下图所示:(boot success即可)
在这里插入图片描述

NameServer和Broker都启动成功代表服务启动成功了,可使用可视化工具RocketMQ-Dashboard进行连接判断服务是否可用。

3.3.脚本启动

每次启动服务都需要到CMD启动NameServer和Broker太麻烦了,整合成一个.bat脚本启动比较方便。

因为NameServer和Broker有先后关系,所以等NameServer启动后延迟500ms再启动Broker。

@echo off
setlocal enabledelayedexpansionREM 启动 NameServer(后台运行)
echo [INFO] 正在启动 NameServer...
start /B mqnamesrv.cmdREM 生成并执行 500ms 延迟脚本
echo [INFO] 等待 NameServer 初始化(500ms)...
echo WScript.Sleep 500 > delay.vbs
cscript //nologo delay.vbs
del /f /q delay.vbs >nul 2>&1REM 启动 Broker(指定 NameServer 地址)
echo [INFO] 正在启动 Broker...
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=trueexit

在这里插入图片描述

在这里插入图片描述


三、可视化工具RocketMQ-Dashboard1.0.0使用

下载RocketMQ-Dashboard源码本地运行,也可打包成jar来运行。

1.下载

官网下载

目前有两个版本,可按需下载。
在这里插入图片描述

2.运行

下载源码后解压。使用IDEA打开源码,装载pom文件,修改rocketmq.config.namesrvAddr为你自己服务地址,启动服务,访问http://localhost:8080/(端口可自行修改)即可。

2.1.修改application.properties文件

server.port默认为8080,可修改,防止冲突。

rocketmq.config.namesrvAddr填上自己的服务地址。

在这里插入图片描述

2.2.启动

在这里插入图片描述

2.3.访问界面

http://localhost:8080/(http://localhost:9091/)

具体使用就不详细讲述了,可自行下载使用。
在这里插入图片描述

2.4.打包jar使用

使用IDEA中maven功能进行打包,打包后在target中找到jar包。
在这里插入图片描述
在这里插入图片描述

将jar复制出来,写.bat脚本运行jar。后续直接启动脚本即可。
在这里插入图片描述

.bat脚本内容如下:

@echo off
setlocal enabledelayedexpansionREM 配置参数(可修改)
set DASHBOARD_PORT=9091
set NAMESRV_ADDR=127.0.0.1:9876echo 正在启动 RocketMQ Dashboard...
echo 控制台端口: %DASHBOARD_PORT%
echo NameServer地址: %NAMESRV_ADDR%java -Dserver.port=%DASHBOARD_PORT% ^-Drocketmq.config.namesrvAddr=%NAMESRV_ADDR% ^-jar rocketmq-dashboard-1.0.0.jarif %errorlevel% neq 0 (echo 启动失败,请检查:echo 1. 当前目录是否存在 rocketmq-dashboard-1.0.0.jarecho 2. Java环境变量是否配置正确
)
pause

四、Spring Boot使用RocketMQ

1.pom引入关键jar

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.7.6</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot Starter for RocketMQ --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency><!-- RocketMQ Client --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version></dependency><!-- Lombok 简化代码 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- 引入 FastJSON 依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies>

2.yml文件引入配置

# rocketMQ配置
rocketmq:name-server: 127.0.0.1:9876producer:group: demo-producer-group  # 生产者组send-message-timeout: 3000  # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2consumer:group: demo-consumer-group  # 消费者组max-reconsume-times: 3       # 消费失败最大重试次数

3.消息体创建

@Data
@AllArgsConstructor
@NoArgsConstructor
public class DemoMessage {private String id;// 消息内容private String content;// 时间戳private LocalDateTime timestamp;
}

4.Producer实现

举例了普通消息、同步消息、异步消息、事务消息四种类型,还有很多包括延时消息、单向消息等就没一一列举了。

其中普通消息、同步消息、异步消息使用同一个demo-topic,采用Tag来过滤消费。

**特别注意:**在消息头中设置Tag不生效,可采用topic拼接Tag的方式来添加( “topic:TagA”)。

@Slf4j
@Service
public class ProducerService {// 普通消息主题private static final String DEMO_TOPIC = "demo-topic";// 事务消息主题private static final String TX_DEMO_TOPIC = "tx-demo-topic";@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 普通发送*/public void send(DemoMessage demoMessage) {Message<DemoMessage> msg = MessageBuilder.withPayload(demoMessage).setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagD")  // 设置Tag过滤.build();// 消息头设置Tag的方式无效。采用主题连接Tag的方式。rocketMQTemplate.send(DEMO_TOPIC + ":TagD", msg);log.info("普通消息发送成功");}/*** 同步发送消息** @param demoMessage* @return*/public String sendSyncMessage(DemoMessage demoMessage) {// 构造消息,指定Topic和TagMessage<DemoMessage> msg = MessageBuilder.withPayload(demoMessage).setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagA")  // 设置Tag过滤.build();SendResult result = rocketMQTemplate.syncSend(DEMO_TOPIC + ":TagA", msg);log.info("同步发送成功,消息ID:{}", result.getMsgId());return result.getMsgId();}/*** 异步发送消息** @param demoMessage* @return*/public void sendAsyncMessage(DemoMessage demoMessage) {Message<DemoMessage> msg = MessageBuilder.withPayload(demoMessage).setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagB")  // 设置Tag过滤.build();// 异步发送并注册回调rocketMQTemplate.asyncSend(DEMO_TOPIC + ":TagB", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步发送成功:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {log.info("异步发送失败:" + e.getMessage());}});}/*** 发送事务消息** @param demoMessage* @return*/public LocalTransactionState sendTransactionMessage(DemoMessage demoMessage) {Message<DemoMessage> msg = MessageBuilder.withPayload(demoMessage).setHeader(RocketMQHeaders.KEYS, demoMessage.getId())
//                .setHeader(RocketMQHeaders.TAGS, "TagC")  // 设置Tag过滤.build();// 发送事务消息TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TX_DEMO_TOPIC + ":TagC", msg, null);log.info("事务消息状态:{}", result.getLocalTransactionState());return result.getLocalTransactionState();}
}

5.PushConsumer实现

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}",topic = "demo-topic",selectorExpression = "TagA || TagB",  // 只消费TagA和TagBconsumeMode = ConsumeMode.CONCURRENTLY,     // 并发消费messageModel = MessageModel.CLUSTERING      // 集群模式
)
public class PushConsumer implements RocketMQListener<DemoMessage> {@Overridepublic void onMessage(DemoMessage message) {log.info("PushConsumer收到消息:" + message);if (message.getContent().contains("error")) {throw new RuntimeException("模拟消费失败,触发重试");}}
}

6.TransactionConsumer实现

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "tx-consumer-group",topic = "tx-demo-topic",selectorExpression = "TagC",consumeMode = ConsumeMode.CONCURRENTLY
)
public class TransactionConsumer implements RocketMQListener<DemoMessage> {@Overridepublic void onMessage(DemoMessage message) {log.info("事务消息消费成功:" + message);}
}

7.事务消息监听器实现类

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {/*** 执行本地事务* @param msg RocketMQ 事务消息对象(包含业务数据)* @param arg 发送事务消息时传入的附加参数* @return RocketMQLocalTransactionState 事务状态(COMMIT/ROLLBACK/UNKNOWN)* @throws RuntimeException 当本地事务执行失败时抛出异常*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 模拟本地事务(如数据库操作)return RocketMQLocalTransactionState.COMMIT;}/*** 检查本地事务状态(用于事务回查)* @param msg RocketMQ 事务消息对象(包含事务ID等信息)* @return RocketMQLocalTransactionState 事务最终状态* @throws RuntimeException 当无法确认事务状态时抛出异常*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 事务回查逻辑return RocketMQLocalTransactionState.COMMIT;}
}

8.Controller测试发送

@RequestMapping("/rocketmq")
@RestController
public class RocketMQController {@Autowiredprivate ProducerService producerService;/*** 普通发送消息** @param message* @return*/@GetMapping("/send/common")public String sendCommonMessage(@RequestParam(value = "message") String message) {DemoMessage demoMessage = new DemoMessage(UUID.randomUUID().toString(),message,LocalDateTime.now());producerService.send(demoMessage);return "消息已发送";}/*** 同步发送消息** @param message* @return*/@GetMapping("/send/sync")public String sendSyncMessage(@RequestParam(value = "message") String message) {DemoMessage demoMessage = new DemoMessage(UUID.randomUUID().toString(),message,LocalDateTime.now());String id = producerService.sendSyncMessage(demoMessage);return "同步发送成功,消息ID:" + id;}/*** 异步发送消息** @param message* @return*/@GetMapping("/send/async")public String sendAsyncMessage(@RequestParam(value = "message") String message) {DemoMessage demoMessage = new DemoMessage(UUID.randomUUID().toString(),message,LocalDateTime.now());producerService.sendAsyncMessage(demoMessage);return "异步发送已触发";}/*** 发送事务消息** @param message* @return*/@GetMapping("/send/transaction")public String sendTransactionMessage(@RequestParam(value = "message") String message) {DemoMessage demoMessage = new DemoMessage(UUID.randomUUID().toString(),message,LocalDateTime.now());LocalTransactionState status = producerService.sendTransactionMessage(demoMessage);return "事务消息状态:" + status;}}

9.测试验证

简单发送消息测试一下,看看生产者和消费者是否打印日志。到可视化工具查看消息情况。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


总结

1、以上就是RocketMQ结合Spring Boot的使用案例,整体还是比较简单,没有举例那种比较复杂的(分布式事务实现等)。
2、SimpleConsumer、PullConsumer没有举例了,大家可自行拓展。
3、顺序消费也没有进行举例,顺序消费让生产者将顺序消息丢到同一个队列里面去消费即可。
4、重复消费,重复消费的话需要保持幂等性,可以借助redis或者mysql来实现。举例mysql可以设置消息key为唯一索引,每次消费消息前将key写入数据库,若已经消费则插入数据库错误,直接返回即可。
5、还有回溯消费、分布式事务等等,大家可拓展学习。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com